use std::collections::{BTreeMap, BTreeSet};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use crate::event_log::EventLog;
use sha2::{Digest, Sha256};
use super::super::{
evaluate_context_pack_suggestion_expectations, generate_context_pack_suggestions, new_id,
normalize_friction_events_json, now_rfc3339, parse_json_value, run_persona_eval_ladder,
ContextPackSuggestionExpectation, ContextPackSuggestionOptions, FrictionEvent,
};
use super::diff::diff_run_records;
use super::json::{clarifying_max_questions, clarifying_min_questions, normalize_question_text};
use super::persistence::load_run_record;
use super::types::{
EvalLedgerAppendReport, EvalLedgerFingerprintMismatch, EvalLedgerPriorCommitReport,
EvalLedgerProvenance, EvalLedgerReadReport, EvalLedgerResumeCell, EvalLedgerResumePlan,
EvalLedgerRow, EvalPackAssertion, EvalPackCase, EvalPackCaseReport, EvalPackFixtureRef,
EvalPackManifest, EvalPackReliabilityBreakdown, EvalPackReliabilityReport, EvalPackReport,
EvalPackRubric, EvalPackRunState, EvalPackSplitValidationReport, EvalPackStatsReport,
EvalPackStatsRow, EvalPackTrialReport, EvalSuiteManifest, ReplayEvalCaseReport,
ReplayEvalReport, ReplayEvalSuiteReport, ReplayFixture, ReplayStageAssertion, RunDiffReport,
RunRecord, RunStageRecord,
};
use crate::value::{VmError, VmValue};
const EVAL_LEDGER_ROW_SCHEMA: &str = "harn.eval.ledger.row.v1";
const EVAL_LEDGER_RUN_STATE_SCHEMA: &str = "harn.eval.run-state.v1";
const EVAL_LEDGER_RESUME_PLAN_SCHEMA: &str = "harn.eval.resume-plan.v1";
const EVAL_LEDGER_ROW_KIND: &str = "eval.ledger.row";
const EVAL_LEDGER_RUN_STATE_KIND: &str = "eval.ledger.run_state";
const EVAL_LEDGER_TOPIC_PREFIX: &str = "eval.ledger";
const EVAL_LEDGER_IDENTITY_HEADER: &str = "eval_ledger_identity";
const EVAL_LEDGER_QUEUE_DEPTH: usize =
crate::runtime_limits::RuntimeLimits::DEFAULT.default_event_log_queue_depth;
const EVAL_LEDGER_READ_BATCH_LIMIT: usize = 1024;
#[derive(Clone, Debug, Default, serde::Deserialize)]
#[serde(default)]
struct EvalLedgerOptions {
namespace: Option<String>,
suite: Option<String>,
model: Option<String>,
split: Option<String>,
commit: Option<String>,
branch: Option<String>,
#[serde(alias = "case")]
case_name: Option<String>,
case_fingerprint: Option<String>,
harness_config_fingerprint: Option<String>,
limit: Option<usize>,
}
struct EvalPackLedgerRun {
log: Arc<crate::event_log::AnyEventLog>,
topic: crate::event_log::Topic,
rows: Vec<EvalLedgerRow>,
suite: String,
model: String,
commit: String,
branch: Option<String>,
provenance: EvalLedgerProvenance,
inserted: usize,
duplicates: usize,
fingerprint_refusals: Vec<EvalLedgerFingerprintMismatch>,
}
pub fn normalize_eval_suite_manifest(value: &VmValue) -> Result<EvalSuiteManifest, VmError> {
let mut manifest: EvalSuiteManifest = parse_json_value(value)?;
if manifest.type_name.is_empty() {
manifest.type_name = "eval_suite_manifest".to_string();
}
if manifest.id.is_empty() {
manifest.id = new_id("eval_suite");
}
Ok(manifest)
}
pub fn load_eval_suite_manifest(path: &Path) -> Result<EvalSuiteManifest, VmError> {
let content = std::fs::read_to_string(path)
.map_err(|e| VmError::Runtime(format!("failed to read eval suite manifest: {e}")))?;
let mut manifest: EvalSuiteManifest = serde_json::from_str(&content)
.map_err(|e| VmError::Runtime(format!("failed to parse eval suite manifest: {e}")))?;
if manifest.base_dir.is_none() {
manifest.base_dir = path.parent().map(|parent| parent.display().to_string());
}
Ok(manifest)
}
pub fn load_eval_pack_manifest(path: &Path) -> Result<EvalPackManifest, VmError> {
let content = std::fs::read_to_string(path)
.map_err(|e| VmError::Runtime(format!("failed to read eval pack manifest: {e}")))?;
let mut manifest: EvalPackManifest =
if path.extension().and_then(|ext| ext.to_str()) == Some("json") {
serde_json::from_str(&content)
.map_err(|e| VmError::Runtime(format!("failed to parse eval pack JSON: {e}")))?
} else {
toml::from_str(&content)
.map_err(|e| VmError::Runtime(format!("failed to parse eval pack TOML: {e}")))?
};
normalize_eval_pack_manifest(&mut manifest)?;
if manifest.base_dir.is_none() {
manifest.base_dir = path.parent().map(|parent| parent.display().to_string());
}
Ok(manifest)
}
pub fn normalize_eval_pack_manifest_value(value: &VmValue) -> Result<EvalPackManifest, VmError> {
let mut manifest: EvalPackManifest = parse_json_value(value)?;
normalize_eval_pack_manifest(&mut manifest)?;
Ok(manifest)
}
fn normalize_eval_pack_manifest(manifest: &mut EvalPackManifest) -> Result<(), VmError> {
if manifest.version == 0 {
manifest.version = 1;
}
if manifest.trials == 0 {
manifest.trials = 1;
}
if manifest.id.is_empty() {
manifest.id = manifest
.name
.clone()
.filter(|name| !name.trim().is_empty())
.unwrap_or_else(|| new_id("eval_pack"));
}
let rubrics_by_id = manifest
.rubrics
.iter()
.filter(|rubric| !rubric.id.is_empty())
.map(|rubric| (rubric.id.as_str(), rubric))
.collect::<BTreeMap<_, _>>();
let fixtures_by_id = manifest
.fixtures
.iter()
.filter(|fixture| !fixture.id.is_empty())
.map(|fixture| (fixture.id.as_str(), fixture))
.collect::<BTreeMap<_, _>>();
for case in &mut manifest.cases {
if case.trials == Some(0) {
return Err(VmError::Runtime(format!(
"eval pack case '{}' has trials = 0",
case.id.as_deref().unwrap_or("<unnamed>")
)));
}
case.case_fingerprint =
eval_pack_case_fingerprint_with_refs(case, &rubrics_by_id, &fixtures_by_id)?;
}
for ladder in &mut manifest.ladders {
super::super::normalize_persona_eval_ladder_manifest(ladder);
}
Ok(())
}
pub fn eval_pack_case_fingerprint(case: &EvalPackCase) -> Result<String, VmError> {
eval_pack_case_fingerprint_with_refs(case, &BTreeMap::new(), &BTreeMap::new())
}
fn eval_pack_case_fingerprint_with_refs(
case: &EvalPackCase,
rubrics_by_id: &BTreeMap<&str, &EvalPackRubric>,
fixtures_by_id: &BTreeMap<&str, &EvalPackFixtureRef>,
) -> Result<String, VmError> {
let mut task = BTreeMap::new();
insert_json_field(&mut task, "run", &case.run)?;
insert_json_field(&mut task, "run_path", &case.run_path)?;
insert_json_field(&mut task, "friction_events", &case.friction_events)?;
let mut expected_outputs = BTreeMap::new();
insert_json_field(&mut expected_outputs, "fixture", &case.fixture)?;
insert_json_field(&mut expected_outputs, "fixture_path", &case.fixture_path)?;
if let Some(fixture_ref) = case.fixture.as_deref().or(case.fixture_path.as_deref()) {
if let Some(fixture) = fixtures_by_id.get(fixture_ref) {
insert_json_field(&mut expected_outputs, "fixture_ref", *fixture)?;
}
}
let resolved_rubrics = case
.rubrics
.iter()
.filter_map(|rubric_id| rubrics_by_id.get(rubric_id.as_str()))
.map(|rubric| {
serde_json::to_value(rubric)
.map_err(|e| VmError::Runtime(format!("failed to encode eval pack rubric: {e}")))
})
.collect::<Result<Vec<_>, _>>()?;
let mut verify = BTreeMap::new();
insert_json_field(&mut verify, "compare_to", &case.compare_to)?;
insert_json_field(&mut verify, "rubric_ids", &case.rubrics)?;
verify.insert(
"rubrics".to_string(),
serde_json::Value::Array(resolved_rubrics),
);
let mut flags = BTreeMap::new();
insert_json_field(&mut flags, "severity", &case.severity)?;
insert_json_field(&mut flags, "thresholds", &case.thresholds)?;
insert_json_field(&mut flags, "metadata", &case.metadata)?;
let mut payload = BTreeMap::new();
payload.insert("task".to_string(), encode_json(&task)?);
payload.insert(
"expected_outputs".to_string(),
encode_json(&expected_outputs)?,
);
payload.insert("verify".to_string(), encode_json(&verify)?);
payload.insert("flags".to_string(), encode_json(&flags)?);
fingerprint_json(&payload)
}
pub fn eval_pack_harness_config_fingerprint(
manifest: &EvalPackManifest,
) -> Result<String, VmError> {
let rubric_harness = manifest
.rubrics
.iter()
.map(|rubric| {
let mut item = BTreeMap::new();
insert_json_field(&mut item, "id", &rubric.id)?;
insert_json_field(&mut item, "kind", &rubric.kind)?;
insert_json_field(&mut item, "prompt", &rubric.prompt)?;
insert_json_field(&mut item, "judge", &rubric.judge)?;
encode_json(&item)
})
.collect::<Result<Vec<_>, VmError>>()?;
let mut harness_metadata = BTreeMap::new();
for key in [
"model",
"provider",
"route",
"prompt",
"promptVersion",
"prompt_version",
"toolFormat",
"tool_format",
"pipelineRev",
"pipeline_rev",
"pipelineRevision",
"pipeline_revision",
"harnVersion",
"harn_version",
"harness",
"harnessConfig",
"harness_config",
] {
if let Some(value) = manifest.metadata.get(key) {
harness_metadata.insert(key.to_string(), value.clone());
}
}
let mut payload = BTreeMap::new();
insert_json_field(&mut payload, "manifest_judge", &manifest.judge)?;
insert_json_field(&mut payload, "default_judge", &manifest.defaults.judge)?;
insert_json_field(&mut payload, "package", &manifest.package)?;
payload.insert(
"harness_metadata".to_string(),
encode_json(&harness_metadata)?,
);
payload.insert(
"rubric_harness".to_string(),
serde_json::Value::Array(rubric_harness),
);
fingerprint_json(&payload)
}
fn insert_json_field<T: serde::Serialize>(
map: &mut BTreeMap<String, serde_json::Value>,
key: &str,
value: &T,
) -> Result<(), VmError> {
map.insert(key.to_string(), encode_json(value)?);
Ok(())
}
fn encode_json<T: serde::Serialize>(value: &T) -> Result<serde_json::Value, VmError> {
serde_json::to_value(value)
.map_err(|e| VmError::Runtime(format!("failed to encode eval pack fingerprint: {e}")))
}
fn fingerprint_json<T: serde::Serialize>(value: &T) -> Result<String, VmError> {
let bytes = serde_json::to_vec(value)
.map_err(|e| VmError::Runtime(format!("failed to encode eval pack fingerprint: {e}")))?;
let digest = hex::encode(Sha256::digest(bytes));
Ok(digest.chars().take(16).collect())
}
pub fn eval_ledger_read_report(
options: Option<serde_json::Value>,
) -> Result<EvalLedgerReadReport, VmError> {
let options = eval_ledger_options(options)?;
let namespace = eval_ledger_namespace(&options);
let topic = eval_ledger_topic(&namespace)?;
let log = ensure_eval_ledger_event_log(None);
let rows = futures::executor::block_on(read_eval_ledger_rows(&log, &topic, &options))?;
Ok(EvalLedgerReadReport { rows })
}
pub fn eval_ledger_append_rows_report(
rows: serde_json::Value,
options: Option<serde_json::Value>,
) -> Result<EvalLedgerAppendReport, VmError> {
let options = eval_ledger_options(options)?;
let namespace = eval_ledger_namespace(&options);
let topic = eval_ledger_topic(&namespace)?;
let provenance = eval_ledger_provenance(None, &options, None);
let rows = parse_eval_ledger_rows(rows)?
.into_iter()
.map(|mut row| {
normalize_eval_ledger_row(&mut row, &options, &provenance);
row
})
.collect::<Vec<_>>();
let log = ensure_eval_ledger_event_log(None);
futures::executor::block_on(append_eval_ledger_rows(&log, &topic, rows))
}
pub fn eval_ledger_prior_commit_rows_report(
options: serde_json::Value,
) -> Result<EvalLedgerPriorCommitReport, VmError> {
let options = eval_ledger_options(Some(options))?;
let namespace = eval_ledger_namespace(&options);
let topic = eval_ledger_topic(&namespace)?;
let log = ensure_eval_ledger_event_log(None);
let mut read_options = options.clone();
read_options.commit = None;
read_options.case_fingerprint = None;
read_options.harness_config_fingerprint = None;
let rows = futures::executor::block_on(read_eval_ledger_rows(&log, &topic, &read_options))?;
Ok(prior_commit_report(rows, &options))
}
pub fn eval_ledger_resume_plan_report(
manifest: &EvalPackManifest,
options: Option<serde_json::Value>,
) -> Result<EvalLedgerResumePlan, VmError> {
let split_report = validate_eval_pack_split(manifest)?;
let harness_config_fingerprint = eval_pack_harness_config_fingerprint(manifest)?;
let options = eval_ledger_options(options)?;
let base_dir = manifest.base_dir.as_deref().map(Path::new);
let suite = options.suite.clone().unwrap_or_else(|| manifest.id.clone());
let model = options
.model
.clone()
.or_else(|| eval_pack_manifest_model(manifest))
.unwrap_or_else(|| "unknown".to_string());
let provenance = eval_ledger_provenance(base_dir, &options, Some(&manifest.metadata));
let commit = options
.commit
.clone()
.unwrap_or_else(|| provenance.commit.clone());
let namespace = eval_pack_ledger_namespace(manifest, &options);
let topic = eval_ledger_topic(&namespace)?;
let log = ensure_eval_ledger_event_log(base_dir);
let read_options = eval_pack_ledger_read_options(&suite, &model, &commit);
let rows = futures::executor::block_on(read_eval_ledger_rows(&log, &topic, &read_options))?;
Ok(build_eval_ledger_resume_plan(
manifest,
&split_report,
&rows,
&suite,
&model,
&commit,
&harness_config_fingerprint,
))
}
fn eval_ledger_options(value: Option<serde_json::Value>) -> Result<EvalLedgerOptions, VmError> {
let mut options = match value {
None | Some(serde_json::Value::Null) => EvalLedgerOptions::default(),
Some(value) => serde_json::from_value(value)
.map_err(|e| VmError::Runtime(format!("eval ledger options parse error: {e}")))?,
};
normalize_optional_string(&mut options.namespace);
normalize_optional_string(&mut options.suite);
normalize_optional_string(&mut options.model);
normalize_optional_string(&mut options.split);
normalize_optional_string(&mut options.commit);
normalize_optional_string(&mut options.branch);
normalize_optional_string(&mut options.case_name);
normalize_optional_string(&mut options.case_fingerprint);
normalize_optional_string(&mut options.harness_config_fingerprint);
Ok(options)
}
fn normalize_optional_string(value: &mut Option<String>) {
if value.as_deref().is_some_and(|text| text.trim().is_empty()) {
*value = None;
}
}
fn parse_eval_ledger_rows(value: serde_json::Value) -> Result<Vec<EvalLedgerRow>, VmError> {
match value {
serde_json::Value::Array(_) => serde_json::from_value(value)
.map_err(|e| VmError::Runtime(format!("eval ledger rows parse error: {e}"))),
serde_json::Value::Object(_) => serde_json::from_value(value)
.map(|row| vec![row])
.map_err(|e| VmError::Runtime(format!("eval ledger row parse error: {e}"))),
_ => Err(VmError::Runtime(
"eval ledger rows must be a row dict or list of row dicts".to_string(),
)),
}
}
fn eval_ledger_namespace(options: &EvalLedgerOptions) -> String {
options
.namespace
.clone()
.or_else(|| options.suite.clone())
.unwrap_or_else(|| "default".to_string())
}
fn eval_pack_ledger_namespace(manifest: &EvalPackManifest, options: &EvalLedgerOptions) -> String {
options
.namespace
.clone()
.or_else(|| metadata_string(&manifest.metadata, &["ledger_namespace", "ledgerNamespace"]))
.or_else(|| options.suite.clone())
.unwrap_or_else(|| manifest.id.clone())
}
fn eval_pack_ledger_read_options(suite: &str, model: &str, commit: &str) -> EvalLedgerOptions {
EvalLedgerOptions {
suite: Some(suite.to_string()),
model: Some(model.to_string()),
commit: Some(commit.to_string()),
..EvalLedgerOptions::default()
}
}
fn eval_ledger_topic(namespace: &str) -> Result<crate::event_log::Topic, VmError> {
let safe_namespace = crate::event_log::sanitize_topic_component(namespace);
crate::event_log::Topic::new(format!("{EVAL_LEDGER_TOPIC_PREFIX}.{safe_namespace}"))
.map_err(eval_ledger_log_error)
}
fn ensure_eval_ledger_event_log(base_dir: Option<&Path>) -> Arc<crate::event_log::AnyEventLog> {
if let Some(log) = crate::event_log::active_event_log() {
return log;
}
if let Some(base_dir) = base_dir {
if crate::event_log::install_lazy_default_for_base_dir(base_dir).is_ok() {
if let Some(log) = crate::event_log::active_event_log() {
return log;
}
}
} else if let Ok(cwd) = std::env::current_dir() {
if crate::event_log::install_lazy_default_for_base_dir(&cwd).is_ok() {
if let Some(log) = crate::event_log::active_event_log() {
return log;
}
}
}
crate::event_log::install_memory_for_current_thread(EVAL_LEDGER_QUEUE_DEPTH)
}
async fn read_eval_ledger_rows(
log: &Arc<crate::event_log::AnyEventLog>,
topic: &crate::event_log::Topic,
options: &EvalLedgerOptions,
) -> Result<Vec<EvalLedgerRow>, VmError> {
let mut rows = Vec::new();
let mut cursor = None;
loop {
let batch = log
.read_range(topic, cursor, EVAL_LEDGER_READ_BATCH_LIMIT)
.await
.map_err(eval_ledger_log_error)?;
if batch.is_empty() {
break;
}
for (event_id, event) in batch {
cursor = Some(event_id);
if let Some(row) = parse_eval_ledger_row(event_id, event) {
if eval_ledger_row_matches(&row, options) {
rows.push(row);
if options.limit.is_some_and(|limit| rows.len() >= limit) {
return Ok(rows);
}
}
}
}
}
Ok(rows)
}
async fn append_eval_ledger_rows(
log: &Arc<crate::event_log::AnyEventLog>,
topic: &crate::event_log::Topic,
rows: Vec<EvalLedgerRow>,
) -> Result<EvalLedgerAppendReport, VmError> {
let mut report = EvalLedgerAppendReport {
appended: rows.len(),
all_skipped: !rows.is_empty() && rows.iter().all(eval_ledger_row_is_skip),
..EvalLedgerAppendReport::default()
};
for row in rows {
let identity = eval_ledger_row_identity(&row)?;
let mut headers = BTreeMap::new();
headers.insert(EVAL_LEDGER_IDENTITY_HEADER.to_string(), identity.clone());
headers.insert("suite".to_string(), row.suite.clone());
headers.insert("model".to_string(), row.model.clone());
headers.insert("commit".to_string(), row.commit.clone());
headers.insert("case_name".to_string(), row.case_name.clone());
headers.insert("trial".to_string(), row.trial.to_string());
let payload = serde_json::to_value(&row)
.map_err(|e| VmError::Runtime(format!("eval ledger row encode error: {e}")))?;
let outcome = log
.append_idempotent_by_header(
topic,
EVAL_LEDGER_IDENTITY_HEADER,
&identity,
crate::event_log::LogEvent::new(EVAL_LEDGER_ROW_KIND, payload)
.with_headers(headers),
)
.await
.map_err(eval_ledger_log_error)?;
if outcome.inserted {
report.inserted += 1;
} else {
report.duplicates += 1;
}
report.event_ids.push(outcome.event_id);
if let Some(stored) = parse_eval_ledger_row(outcome.event_id, outcome.event) {
report.rows.push(stored);
}
}
log.flush().await.map_err(eval_ledger_log_error)?;
Ok(report)
}
fn parse_eval_ledger_row(
event_id: crate::event_log::EventId,
event: crate::event_log::LogEvent,
) -> Option<EvalLedgerRow> {
if event.kind != EVAL_LEDGER_ROW_KIND {
return None;
}
let mut row: EvalLedgerRow = serde_json::from_value(event.payload).ok()?;
if row.schema != EVAL_LEDGER_ROW_SCHEMA {
return None;
}
row.event_id = Some(event_id);
Some(row)
}
fn eval_ledger_row_matches(row: &EvalLedgerRow, options: &EvalLedgerOptions) -> bool {
option_matches(options.suite.as_deref(), &row.suite)
&& option_matches(options.model.as_deref(), &row.model)
&& option_matches(options.commit.as_deref(), &row.commit)
&& option_matches(options.case_name.as_deref(), &row.case_name)
&& option_matches(options.case_fingerprint.as_deref(), &row.case_fingerprint)
&& option_matches(
options.harness_config_fingerprint.as_deref(),
&row.harness_config_fingerprint,
)
&& match options.split.as_deref() {
Some(expected) => row.split.as_deref() == Some(expected),
None => true,
}
}
fn option_matches(expected: Option<&str>, actual: &str) -> bool {
expected.is_none_or(|expected| expected == actual)
}
fn normalize_eval_ledger_row(
row: &mut EvalLedgerRow,
options: &EvalLedgerOptions,
provenance: &EvalLedgerProvenance,
) {
if row.schema.is_empty() {
row.schema = EVAL_LEDGER_ROW_SCHEMA.to_string();
}
if row.suite.is_empty() {
row.suite = options
.suite
.clone()
.unwrap_or_else(|| eval_ledger_namespace(options));
}
if row.model.is_empty() {
row.model = options
.model
.clone()
.unwrap_or_else(|| "unknown".to_string());
}
if row.split.is_none() {
row.split = options.split.clone();
}
if row.commit.is_empty() {
row.commit = options
.commit
.clone()
.unwrap_or_else(|| provenance.commit.clone());
}
if row.case_name.is_empty() {
row.case_name = options
.case_name
.clone()
.filter(|name| !name.is_empty())
.unwrap_or_else(|| row.name.clone());
}
if row.name.is_empty() {
row.name = row.case_name.clone();
}
if row.case_fingerprint.is_empty() {
row.case_fingerprint = options.case_fingerprint.clone().unwrap_or_default();
}
if row.harness_config_fingerprint.is_empty() {
row.harness_config_fingerprint = options
.harness_config_fingerprint
.clone()
.unwrap_or_default();
}
if row.trial == 0 {
row.trial = 1;
}
if row.trials == 0 {
row.trials = 1;
}
if row.status.is_empty() {
row.status = if row.passes > 0 {
"PASS"
} else if row.fails > 0 {
"FAIL"
} else {
"skip"
}
.to_string();
}
if row.verification.is_empty() {
row.verification = row.status.clone();
}
if row.passes + row.fails + row.skips == 0 {
match row.status.to_ascii_uppercase().as_str() {
"PASS" => row.passes = 1,
"FAIL" => row.fails = 1,
_ => row.skips = 1,
}
}
if row.pass_rate == 0.0 && row.passes > 0 {
row.pass_rate = row.passes as f64 / row.trials.max(1) as f64;
}
if row.provenance.commit.is_empty() {
row.provenance.commit = row.commit.clone();
}
if row.provenance.branch.is_none() {
row.provenance.branch = provenance.branch.clone();
}
if row.provenance.ts.is_empty() {
row.provenance.ts = provenance.ts.clone();
}
if row.provenance.harn_version.is_empty() {
row.provenance.harn_version = provenance.harn_version.clone();
}
if row.provenance.host.is_empty() {
row.provenance.host = provenance.host.clone();
}
}
fn eval_ledger_row_identity(row: &EvalLedgerRow) -> Result<String, VmError> {
let material = serde_json::json!({
"schema": EVAL_LEDGER_ROW_SCHEMA,
"suite": row.suite,
"model": row.model,
"split": row.split,
"commit": row.commit,
"case_name": row.case_name,
"case_fingerprint": row.case_fingerprint,
"harness_config_fingerprint": row.harness_config_fingerprint,
"trial": row.trial,
});
let bytes = serde_json::to_vec(&material)
.map_err(|e| VmError::Runtime(format!("eval ledger identity encode error: {e}")))?;
Ok(format!("sha256:{}", hex::encode(Sha256::digest(bytes))))
}
fn eval_ledger_row_is_skip(row: &EvalLedgerRow) -> bool {
row.skipped || row.skips > 0 || row.status.eq_ignore_ascii_case("skip")
}
fn eval_ledger_provenance(
base_dir: Option<&Path>,
options: &EvalLedgerOptions,
metadata: Option<&BTreeMap<String, serde_json::Value>>,
) -> EvalLedgerProvenance {
let commit = options
.commit
.clone()
.or_else(|| {
metadata.and_then(|metadata| {
metadata_string(metadata, &["commit", "git_commit", "source_commit"])
})
})
.or_else(|| env_string(&["HARN_EVAL_COMMIT", "HARN_GIT_COMMIT", "GITHUB_SHA"]))
.or_else(|| git_output(base_dir, &["rev-parse", "HEAD"]))
.unwrap_or_else(|| "unknown".to_string());
let branch = options
.branch
.clone()
.or_else(|| {
metadata.and_then(|metadata| {
metadata_string(metadata, &["branch", "git_branch", "source_branch"])
})
})
.or_else(|| env_string(&["HARN_EVAL_BRANCH", "HARN_GIT_BRANCH", "GITHUB_REF_NAME"]))
.or_else(|| git_output(base_dir, &["rev-parse", "--abbrev-ref", "HEAD"]));
EvalLedgerProvenance {
commit,
branch,
ts: now_rfc3339(),
harn_version: crate::bytecode_cache::HARN_VERSION.to_string(),
host: env_string(&["HOSTNAME", "COMPUTERNAME"]).unwrap_or_else(|| "unknown".to_string()),
}
}
fn env_string(keys: &[&str]) -> Option<String> {
keys.iter().find_map(|key| {
std::env::var(key)
.ok()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
})
}
fn git_output(base_dir: Option<&Path>, args: &[&str]) -> Option<String> {
let mut command = std::process::Command::new("git");
if let Some(base_dir) = base_dir {
command.arg("-C").arg(base_dir);
}
let output = command.args(args).output().ok()?;
if !output.status.success() {
return None;
}
String::from_utf8(output.stdout)
.ok()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
}
fn metadata_string(
metadata: &BTreeMap<String, serde_json::Value>,
keys: &[&str],
) -> Option<String> {
keys.iter()
.find_map(|key| json_value_string(metadata.get(*key)?))
}
fn json_value_string(value: &serde_json::Value) -> Option<String> {
match value {
serde_json::Value::String(value) => Some(value.trim().to_string()),
serde_json::Value::Number(value) => Some(value.to_string()),
serde_json::Value::Bool(value) => Some(value.to_string()),
_ => None,
}
.filter(|value| !value.is_empty())
}
fn eval_pack_manifest_model(manifest: &EvalPackManifest) -> Option<String> {
metadata_string(&manifest.metadata, &["model", "provider_model", "route"])
.or_else(|| {
manifest
.judge
.as_ref()
.and_then(|judge| judge.model.clone())
})
.or_else(|| {
manifest
.defaults
.judge
.as_ref()
.and_then(|judge| judge.model.clone())
})
}
fn prior_commit_report(
rows: Vec<EvalLedgerRow>,
options: &EvalLedgerOptions,
) -> EvalLedgerPriorCommitReport {
let current_commit = options.commit.as_deref().unwrap_or_default();
let mut fingerprint_mismatches = Vec::new();
let mut candidates = Vec::new();
let mut latest_event_by_commit = BTreeMap::<String, u64>::new();
for row in rows {
if row.commit == current_commit {
continue;
}
if let Some(mismatch) = fingerprint_mismatch_for_row(&row, options) {
fingerprint_mismatches.push(mismatch);
continue;
}
let event_id = row.event_id.unwrap_or_default();
latest_event_by_commit
.entry(row.commit.clone())
.and_modify(|existing| *existing = (*existing).max(event_id))
.or_insert(event_id);
candidates.push(row);
}
let selected_commit = latest_event_by_commit
.iter()
.max_by_key(|(_, event_id)| *event_id)
.map(|(commit, _)| commit.clone());
let rows = selected_commit
.as_ref()
.map(|commit| {
candidates
.into_iter()
.filter(|row| &row.commit == commit)
.collect()
})
.unwrap_or_default();
EvalLedgerPriorCommitReport {
commit: selected_commit,
model: options.model.clone().unwrap_or_default(),
split: options.split.clone(),
rows,
fingerprint_mismatches,
}
}
fn fingerprint_mismatch_for_row(
row: &EvalLedgerRow,
options: &EvalLedgerOptions,
) -> Option<EvalLedgerFingerprintMismatch> {
let expected_case = options.case_fingerprint.as_deref();
let expected_harness = options.harness_config_fingerprint.as_deref();
let case_mismatch = expected_case.is_some_and(|expected| expected != row.case_fingerprint);
let harness_mismatch =
expected_harness.is_some_and(|expected| expected != row.harness_config_fingerprint);
if !(case_mismatch || harness_mismatch) {
return None;
}
Some(EvalLedgerFingerprintMismatch {
case_name: row.case_name.clone(),
split: row.split.clone(),
commit: row.commit.clone(),
trial: row.trial,
case_fingerprint: row.case_fingerprint.clone(),
harness_config_fingerprint: row.harness_config_fingerprint.clone(),
expected_case_fingerprint: expected_case.unwrap_or_default().to_string(),
expected_harness_config_fingerprint: expected_harness.unwrap_or_default().to_string(),
})
}
fn build_eval_ledger_resume_plan(
manifest: &EvalPackManifest,
split_report: &EvalPackSplitValidationReport,
rows: &[EvalLedgerRow],
suite: &str,
model: &str,
commit: &str,
harness_config_fingerprint: &str,
) -> EvalLedgerResumePlan {
let split_by_case = split_by_case_id(split_report);
let mut cells = Vec::new();
let mut fingerprint_refusals = Vec::new();
let mut skipped_cells = 0usize;
for (index, case) in manifest.cases.iter().enumerate() {
let case_id = eval_pack_case_id(case, index);
let split = split_by_case.get(&case_id).cloned();
let trial_count = case.trials.unwrap_or(manifest.trials);
for trial in 1..=trial_count {
let matching = ledger_rows_for_cell(
rows,
suite,
model,
split.as_deref(),
commit,
&case_id,
trial,
);
let exact = matching
.iter()
.copied()
.filter(|row| {
row.case_fingerprint == case.case_fingerprint
&& row.harness_config_fingerprint == harness_config_fingerprint
})
.max_by_key(|row| row.event_id.unwrap_or_default());
if let Some(row) = exact {
skipped_cells += 1;
cells.push(EvalLedgerResumeCell {
case_name: case_id.clone(),
split: split.clone(),
trial,
status: "skip".to_string(),
reason: "matching ledger row".to_string(),
event_id: row.event_id,
});
continue;
}
let mut refused = false;
for row in matching {
if row.case_fingerprint != case.case_fingerprint
|| row.harness_config_fingerprint != harness_config_fingerprint
{
fingerprint_refusals.push(EvalLedgerFingerprintMismatch {
case_name: case_id.clone(),
split: split.clone(),
commit: row.commit.clone(),
trial,
case_fingerprint: row.case_fingerprint.clone(),
harness_config_fingerprint: row.harness_config_fingerprint.clone(),
expected_case_fingerprint: case.case_fingerprint.clone(),
expected_harness_config_fingerprint: harness_config_fingerprint.to_string(),
});
refused = true;
}
}
cells.push(EvalLedgerResumeCell {
case_name: case_id.clone(),
split: split.clone(),
trial,
status: "run".to_string(),
reason: if refused {
"fingerprint mismatch".to_string()
} else {
"missing ledger row".to_string()
},
event_id: None,
});
}
}
let requested_cells = cells.len();
let remaining_cells = requested_cells.saturating_sub(skipped_cells);
EvalLedgerResumePlan {
schema: EVAL_LEDGER_RESUME_PLAN_SCHEMA.to_string(),
suite: suite.to_string(),
model: model.to_string(),
commit: commit.to_string(),
harness_config_fingerprint: harness_config_fingerprint.to_string(),
requested_cells,
completed_cells: skipped_cells,
skipped_cells,
remaining_cells,
all_skipped: requested_cells > 0 && remaining_cells == 0,
fingerprint_refusals,
cells,
}
}
fn ledger_rows_for_cell<'a>(
rows: &'a [EvalLedgerRow],
suite: &str,
model: &str,
split: Option<&str>,
commit: &str,
case_name: &str,
trial: usize,
) -> Vec<&'a EvalLedgerRow> {
rows.iter()
.filter(|row| {
row.suite == suite
&& row.model == model
&& row.split.as_deref() == split
&& row.commit == commit
&& row.case_name == case_name
&& row.trial == trial
})
.collect()
}
fn eval_ledger_log_error(error: crate::event_log::LogError) -> VmError {
VmError::Runtime(format!("eval ledger: event log: {error}"))
}
impl EvalPackLedgerRun {
fn start(
manifest: &EvalPackManifest,
base_dir: Option<&Path>,
options: Option<serde_json::Value>,
) -> Result<Self, VmError> {
let options = eval_ledger_options(options)?;
let suite = options.suite.clone().unwrap_or_else(|| manifest.id.clone());
let model = options
.model
.clone()
.or_else(|| eval_pack_manifest_model(manifest))
.unwrap_or_else(|| "unknown".to_string());
let provenance = eval_ledger_provenance(base_dir, &options, Some(&manifest.metadata));
let commit = options
.commit
.clone()
.unwrap_or_else(|| provenance.commit.clone());
let namespace = eval_pack_ledger_namespace(manifest, &options);
let topic = eval_ledger_topic(&namespace)?;
let log = ensure_eval_ledger_event_log(base_dir);
let read_options = eval_pack_ledger_read_options(&suite, &model, &commit);
let rows = futures::executor::block_on(read_eval_ledger_rows(&log, &topic, &read_options))?;
Ok(Self {
log,
topic,
rows,
suite,
model,
commit,
branch: provenance.branch.clone(),
provenance,
inserted: 0,
duplicates: 0,
fingerprint_refusals: Vec::new(),
})
}
fn replay_row_for_cell(
&mut self,
case_id: &str,
split: Option<&str>,
trial: usize,
case_fingerprint: &str,
harness_config_fingerprint: &str,
) -> Option<EvalLedgerRow> {
let matching = ledger_rows_for_cell(
&self.rows,
&self.suite,
&self.model,
split,
&self.commit,
case_id,
trial,
);
let exact = matching
.iter()
.copied()
.filter(|row| {
row.case_fingerprint == case_fingerprint
&& row.harness_config_fingerprint == harness_config_fingerprint
})
.max_by_key(|row| row.event_id.unwrap_or_default())
.cloned();
if exact.is_some() {
return exact;
}
for row in matching {
if row.case_fingerprint != case_fingerprint
|| row.harness_config_fingerprint != harness_config_fingerprint
{
self.fingerprint_refusals
.push(EvalLedgerFingerprintMismatch {
case_name: case_id.to_string(),
split: split.map(str::to_string),
commit: row.commit.clone(),
trial,
case_fingerprint: row.case_fingerprint.clone(),
harness_config_fingerprint: row.harness_config_fingerprint.clone(),
expected_case_fingerprint: case_fingerprint.to_string(),
expected_harness_config_fingerprint: harness_config_fingerprint.to_string(),
});
}
}
None
}
fn append_trial_row(&mut self, row: EvalLedgerRow) -> Result<(), VmError> {
let report = futures::executor::block_on(append_eval_ledger_rows(
&self.log,
&self.topic,
vec![row],
))?;
self.inserted += report.inserted;
self.duplicates += report.duplicates;
self.rows.extend(report.rows);
Ok(())
}
fn finish(
&self,
requested_cells: usize,
skipped_cells: usize,
executed_cells: usize,
) -> Result<EvalPackRunState, VmError> {
let remaining_cells = requested_cells.saturating_sub(skipped_cells + executed_cells);
let mut state = EvalPackRunState {
schema: EVAL_LEDGER_RUN_STATE_SCHEMA.to_string(),
suite: self.suite.clone(),
model: self.model.clone(),
commit: self.commit.clone(),
branch: self.branch.clone(),
requested_cells,
completed_cells: skipped_cells + executed_cells,
skipped_cells,
executed_cells,
remaining_cells,
ledger_rows_inserted: self.inserted,
ledger_rows_duplicate: self.duplicates,
fingerprint_refusals: self.fingerprint_refusals.len(),
all_skipped: requested_cells > 0 && skipped_cells == requested_cells,
heartbeat_event_id: None,
};
let event_id = self.append_run_state(&state)?;
state.heartbeat_event_id = Some(event_id);
Ok(state)
}
fn append_run_state(&self, state: &EvalPackRunState) -> Result<u64, VmError> {
let payload = serde_json::to_value(state)
.map_err(|e| VmError::Runtime(format!("eval run-state encode error: {e}")))?;
let event_id = futures::executor::block_on(self.log.append(
&self.topic,
crate::event_log::LogEvent::new(EVAL_LEDGER_RUN_STATE_KIND, payload),
))
.map_err(eval_ledger_log_error)?;
futures::executor::block_on(self.log.flush()).map_err(eval_ledger_log_error)?;
Ok(event_id)
}
}
pub fn validate_eval_pack_split(
manifest: &EvalPackManifest,
) -> Result<EvalPackSplitValidationReport, VmError> {
let report = eval_pack_split_validation_report(manifest);
if !report.valid {
return Err(VmError::Runtime(format!(
"eval pack split invalid: {}",
render_split_validation_errors(&report).join("; ")
)));
}
Ok(report)
}
fn eval_pack_split_validation_report(manifest: &EvalPackManifest) -> EvalPackSplitValidationReport {
let case_ids = eval_pack_case_ids(manifest);
let mut duplicate_case_ids = duplicates(&case_ids);
duplicate_case_ids.sort();
let case_set = case_ids.iter().cloned().collect::<BTreeSet<_>>();
let Some(split) = &manifest.split else {
return EvalPackSplitValidationReport {
valid: duplicate_case_ids.is_empty(),
case_count: case_ids.len(),
covered_count: 0,
duplicate_case_ids,
..EvalPackSplitValidationReport::default()
};
};
let mut duplicate_partition_cases = Vec::new();
let mut unknown_cases = Vec::new();
let mut seen_by_case: BTreeMap<String, Vec<String>> = BTreeMap::new();
for (partition, cases) in &split.partitions {
let mut local_seen = BTreeSet::new();
for case_id in cases {
if !local_seen.insert(case_id.clone()) {
duplicate_partition_cases.push(format!("{partition}:{case_id}"));
}
if !case_set.contains(case_id) {
unknown_cases.push(format!("{partition}:{case_id}"));
}
let partitions = seen_by_case.entry(case_id.clone()).or_default();
if !partitions.contains(partition) {
partitions.push(partition.clone());
}
}
}
let mut overlap_cases = seen_by_case
.iter()
.filter(|(case_id, partitions)| case_set.contains(*case_id) && partitions.len() > 1)
.map(|(case_id, partitions)| format!("{case_id}:{}", partitions.join(",")))
.collect::<Vec<_>>();
let mut missing_cases = case_set
.iter()
.filter(|case_id| !seen_by_case.contains_key(*case_id))
.cloned()
.collect::<Vec<_>>();
duplicate_partition_cases.sort();
unknown_cases.sort();
overlap_cases.sort();
missing_cases.sort();
let covered_count = case_set
.iter()
.filter(|case_id| seen_by_case.contains_key(*case_id))
.count();
let valid = duplicate_case_ids.is_empty()
&& duplicate_partition_cases.is_empty()
&& unknown_cases.is_empty()
&& overlap_cases.is_empty()
&& missing_cases.is_empty();
EvalPackSplitValidationReport {
valid,
partitions: split.partitions.clone(),
case_count: case_ids.len(),
covered_count,
duplicate_case_ids,
duplicate_partition_cases,
overlap_cases,
unknown_cases,
missing_cases,
}
}
fn eval_pack_case_ids(manifest: &EvalPackManifest) -> Vec<String> {
manifest
.cases
.iter()
.enumerate()
.map(|(index, case)| eval_pack_case_id(case, index))
.collect()
}
fn eval_pack_case_id(case: &EvalPackCase, index: usize) -> String {
case.id
.clone()
.filter(|id| !id.trim().is_empty())
.unwrap_or_else(|| format!("case_{}", index + 1))
}
fn duplicates(values: &[String]) -> Vec<String> {
let mut seen = BTreeSet::new();
let mut duplicates = BTreeSet::new();
for value in values {
if !seen.insert(value.clone()) {
duplicates.insert(value.clone());
}
}
duplicates.into_iter().collect()
}
fn render_split_validation_errors(report: &EvalPackSplitValidationReport) -> Vec<String> {
let mut errors = Vec::new();
if !report.duplicate_case_ids.is_empty() {
errors.push(format!(
"duplicate case ids: {}",
report.duplicate_case_ids.join(", ")
));
}
if !report.duplicate_partition_cases.is_empty() {
errors.push(format!(
"duplicate partition entries: {}",
report.duplicate_partition_cases.join(", ")
));
}
if !report.overlap_cases.is_empty() {
errors.push(format!(
"overlapping cases: {}",
report.overlap_cases.join(", ")
));
}
if !report.unknown_cases.is_empty() {
errors.push(format!(
"unknown cases: {}",
report.unknown_cases.join(", ")
));
}
if !report.missing_cases.is_empty() {
errors.push(format!(
"missing cases: {}",
report.missing_cases.join(", ")
));
}
if errors.is_empty() {
errors.push("unknown split validation error".to_string());
}
errors
}
fn load_replay_fixture(path: &Path) -> Result<ReplayFixture, VmError> {
let content = std::fs::read_to_string(path)
.map_err(|e| VmError::Runtime(format!("failed to read replay fixture: {e}")))?;
serde_json::from_str(&content)
.map_err(|e| VmError::Runtime(format!("failed to parse replay fixture: {e}")))
}
fn load_run_record_from_fixture_ref(
fixture: &EvalPackFixtureRef,
base_dir: Option<&Path>,
) -> Result<RunRecord, VmError> {
if let Some(inline) = &fixture.inline {
let run: RunRecord = serde_json::from_value(inline.clone())
.map_err(|e| VmError::Runtime(format!("failed to parse inline run record: {e}")))?;
return Ok(run);
}
let path = fixture.path.as_deref().ok_or_else(|| {
VmError::Runtime(format!(
"fixture '{}' is missing path or inline run",
fixture.id
))
})?;
load_run_record(&resolve_manifest_path(base_dir, path))
}
fn load_replay_fixture_from_ref(
fixture: &EvalPackFixtureRef,
base_dir: Option<&Path>,
) -> Result<ReplayFixture, VmError> {
if let Some(inline) = &fixture.inline {
return serde_json::from_value(inline.clone())
.map_err(|e| VmError::Runtime(format!("failed to parse inline replay fixture: {e}")));
}
let path = fixture.path.as_deref().ok_or_else(|| {
VmError::Runtime(format!(
"fixture '{}' is missing path or inline replay fixture",
fixture.id
))
})?;
load_replay_fixture(&resolve_manifest_path(base_dir, path))
}
fn resolve_manifest_path(base_dir: Option<&Path>, path: &str) -> PathBuf {
let path_buf = PathBuf::from(path);
if path_buf.is_absolute() {
path_buf
} else if let Some(base_dir) = base_dir {
base_dir.join(path_buf)
} else {
path_buf
}
}
pub fn evaluate_run_suite_manifest(
manifest: &EvalSuiteManifest,
) -> Result<ReplayEvalSuiteReport, VmError> {
let base_dir = manifest.base_dir.as_deref().map(Path::new);
let mut reports = Vec::new();
for case in &manifest.cases {
let run_path = resolve_manifest_path(base_dir, &case.run_path);
let run = load_run_record(&run_path)?;
let fixture = match &case.fixture_path {
Some(path) => load_replay_fixture(&resolve_manifest_path(base_dir, path))?,
None => run
.replay_fixture
.clone()
.unwrap_or_else(|| replay_fixture_from_run(&run)),
};
let eval = evaluate_run_against_fixture(&run, &fixture);
let mut pass = eval.pass;
let mut failures = eval.failures;
let comparison = match &case.compare_to {
Some(path) => {
let baseline_path = resolve_manifest_path(base_dir, path);
let baseline = load_run_record(&baseline_path)?;
let diff = diff_run_records(&baseline, &run);
if !diff.identical {
pass = false;
failures.push(format!(
"run differs from baseline {} with {} stage changes",
baseline_path.display(),
diff.stage_diffs.len()
));
}
Some(diff)
}
None => None,
};
reports.push(ReplayEvalCaseReport {
run_id: run.id.clone(),
workflow_id: run.workflow_id.clone(),
label: case.label.clone(),
pass,
failures,
stage_count: eval.stage_count,
source_path: Some(run_path.display().to_string()),
comparison,
});
}
let total = reports.len();
let passed = reports.iter().filter(|report| report.pass).count();
let failed = total.saturating_sub(passed);
Ok(ReplayEvalSuiteReport {
pass: failed == 0,
total,
passed,
failed,
cases: reports,
})
}
pub fn evaluate_eval_pack_manifest(manifest: &EvalPackManifest) -> Result<EvalPackReport, VmError> {
evaluate_eval_pack_manifest_inner(manifest, false, None)
}
pub fn evaluate_eval_pack_manifest_resumable(
manifest: &EvalPackManifest,
ledger_options: Option<serde_json::Value>,
) -> Result<EvalPackReport, VmError> {
evaluate_eval_pack_manifest_inner(manifest, true, ledger_options)
}
fn evaluate_eval_pack_manifest_inner(
manifest: &EvalPackManifest,
ledger_enabled: bool,
ledger_options: Option<serde_json::Value>,
) -> Result<EvalPackReport, VmError> {
let base_dir = manifest.base_dir.as_deref().map(Path::new);
let fixture_base_dir_buf = manifest
.defaults
.fixture_root
.as_deref()
.map(|root| resolve_manifest_path(base_dir, root));
let fixture_base_dir = fixture_base_dir_buf.as_deref().or(base_dir);
let fixtures_by_id: BTreeMap<&str, &EvalPackFixtureRef> = manifest
.fixtures
.iter()
.filter(|fixture| !fixture.id.is_empty())
.map(|fixture| (fixture.id.as_str(), fixture))
.collect();
let rubrics_by_id: BTreeMap<&str, &EvalPackRubric> = manifest
.rubrics
.iter()
.filter(|rubric| !rubric.id.is_empty())
.map(|rubric| (rubric.id.as_str(), rubric))
.collect();
let split_report = validate_eval_pack_split(manifest)?;
let split_by_case = split_by_case_id(&split_report);
let harness_config_fingerprint = eval_pack_harness_config_fingerprint(manifest)?;
let mut ledger = if ledger_enabled {
Some(EvalPackLedgerRun::start(
manifest,
base_dir,
ledger_options,
)?)
} else {
None
};
let mut requested_cells = 0usize;
let mut skipped_cells = 0usize;
let mut executed_cells = 0usize;
let mut reports = Vec::new();
for (index, case) in manifest.cases.iter().enumerate() {
let case_id = eval_pack_case_id(case, index);
let label = case
.name
.clone()
.or_else(|| case.id.clone())
.unwrap_or_else(|| case_id.clone());
let severity = eval_pack_case_severity(manifest, case);
let blocking = severity == "blocking";
let trial_count = case.trials.unwrap_or(manifest.trials);
let split = split_by_case.get(&case_id).cloned();
requested_cells += trial_count;
let mut trials = Vec::with_capacity(trial_count);
for trial in 1..=trial_count {
if let Some(ledger) = ledger.as_mut() {
if let Some(row) = ledger.replay_row_for_cell(
&case_id,
split.as_deref(),
trial,
&case.case_fingerprint,
&harness_config_fingerprint,
) {
skipped_cells += 1;
trials.push(eval_pack_trial_report_from_ledger_row(&row, blocking));
continue;
}
}
let report = if case.friction_events.is_some() {
evaluate_eval_pack_friction_trial(
manifest,
case,
trial,
&severity,
blocking,
base_dir,
fixture_base_dir,
&fixtures_by_id,
&rubrics_by_id,
)?
} else {
evaluate_eval_pack_run_trial(
manifest,
case,
trial,
&severity,
blocking,
base_dir,
fixture_base_dir,
&fixtures_by_id,
&rubrics_by_id,
)?
};
if let Some(ledger) = ledger.as_mut() {
let row = eval_ledger_row_from_trial(
case,
&case_id,
split.clone(),
&ledger.suite,
&ledger.model,
&ledger.commit,
&ledger.provenance,
&harness_config_fingerprint,
&report,
);
ledger.append_trial_row(row)?;
}
executed_cells += 1;
trials.push(report);
}
reports.push(eval_pack_case_report_from_trials(
case,
case_id,
label,
severity,
split,
blocking,
harness_config_fingerprint.clone(),
trials,
));
}
let mut ladder_reports = Vec::new();
for ladder in &manifest.ladders {
let mut ladder = ladder.clone();
if ladder.base_dir.is_none() {
ladder.base_dir = manifest.base_dir.clone();
}
ladder_reports.push(run_persona_eval_ladder(&ladder)?);
}
let stats_rows = reports
.iter()
.map(|report| report.stats_row.clone())
.collect::<Vec<_>>();
let stats = eval_pack_stats_report(&stats_rows);
let case_total = reports.len();
let ladder_total = ladder_reports.len();
let total = case_total + ladder_total;
let trial_count = reports.iter().map(|report| report.trial_count).sum();
let case_blocking_failed = reports
.iter()
.filter(|report| report.blocking && report.reliability.status != "all-pass")
.count();
let ladder_blocking_failed = ladder_reports
.iter()
.filter(|report| report.blocking && !report.pass)
.count();
let blocking_failed = case_blocking_failed + ladder_blocking_failed;
let warning_failed = reports
.iter()
.filter(|report| !report.warnings.is_empty())
.count()
+ ladder_reports
.iter()
.filter(|report| !report.pass && report.severity == "warning")
.count();
let informational_failed = reports
.iter()
.filter(|report| !report.informational.is_empty())
.count()
+ ladder_reports
.iter()
.filter(|report| !report.pass && report.severity == "informational")
.count();
let passed = reports.iter().filter(|report| report.pass).count()
+ ladder_reports.iter().filter(|report| report.pass).count();
let run_state = match ledger.as_ref() {
Some(ledger) => ledger.finish(requested_cells, skipped_cells, executed_cells)?,
None => EvalPackRunState {
schema: EVAL_LEDGER_RUN_STATE_SCHEMA.to_string(),
suite: manifest.id.clone(),
model: eval_pack_manifest_model(manifest).unwrap_or_else(|| "unknown".to_string()),
requested_cells,
completed_cells: requested_cells,
executed_cells: requested_cells,
..EvalPackRunState::default()
},
};
Ok(EvalPackReport {
pack_id: manifest.id.clone(),
harness_config_fingerprint,
pass: blocking_failed == 0,
total,
passed,
failed: total.saturating_sub(passed),
blocking_failed,
warning_failed,
informational_failed,
trial_count,
run_state,
split: manifest.split.as_ref().map(|_| split_report),
stats,
stats_rows,
cases: reports,
ladders: ladder_reports,
})
}
#[allow(clippy::too_many_arguments)]
fn evaluate_eval_pack_run_trial(
manifest: &EvalPackManifest,
case: &EvalPackCase,
trial: usize,
severity: &str,
blocking: bool,
base_dir: Option<&Path>,
fixture_base_dir: Option<&Path>,
fixtures_by_id: &BTreeMap<&str, &EvalPackFixtureRef>,
rubrics_by_id: &BTreeMap<&str, &EvalPackRubric>,
) -> Result<EvalPackTrialReport, VmError> {
let mut failures = Vec::new();
let mut warnings = Vec::new();
let informational = Vec::new();
let run = load_eval_pack_case_run(case, base_dir, fixture_base_dir, fixtures_by_id)?;
let fixture =
load_eval_pack_case_fixture(case, base_dir, fixture_base_dir, fixtures_by_id, &run)?;
let eval = evaluate_run_against_fixture(&run, &fixture);
failures.extend(eval.failures);
apply_eval_pack_thresholds(&run, &manifest.defaults.thresholds, &mut failures);
apply_eval_pack_thresholds(&run, &case.thresholds, &mut failures);
let comparison = match case.compare_to.as_ref().or(manifest.baseline.as_ref()) {
Some(path) => {
let baseline_path = resolve_manifest_path(base_dir, path);
let baseline = load_run_record(&baseline_path)?;
let diff = diff_run_records(&baseline, &run);
if !diff.identical {
failures.push(format!(
"run differs from baseline {} with {} stage changes",
baseline_path.display(),
diff.stage_diffs.len()
));
}
Some(diff)
}
None => None,
};
for rubric_id in &case.rubrics {
let Some(rubric) = rubrics_by_id.get(rubric_id.as_str()) else {
failures.push(format!("case references unknown rubric '{rubric_id}'"));
continue;
};
apply_eval_pack_rubric(rubric, &run, &mut failures, &mut warnings);
}
Ok(eval_pack_trial_report(
trial,
severity,
blocking,
run.id.clone(),
run.workflow_id.clone(),
eval_pack_case_source_path(case, base_dir, fixture_base_dir, fixtures_by_id),
eval.stage_count,
run.status.to_ascii_lowercase().contains("timeout"),
run.usage
.as_ref()
.map(|usage| usage.total_duration_ms as f64 / 1000.0)
.unwrap_or_default(),
run.usage
.as_ref()
.map(|usage| usage.total_cost)
.unwrap_or_default(),
failures,
warnings,
informational,
comparison,
))
}
#[allow(clippy::too_many_arguments)]
fn evaluate_eval_pack_friction_trial(
manifest: &EvalPackManifest,
case: &EvalPackCase,
trial: usize,
severity: &str,
blocking: bool,
base_dir: Option<&Path>,
fixture_base_dir: Option<&Path>,
fixtures_by_id: &BTreeMap<&str, &EvalPackFixtureRef>,
rubrics_by_id: &BTreeMap<&str, &EvalPackRubric>,
) -> Result<EvalPackTrialReport, VmError> {
let mut failures = Vec::new();
let mut warnings = Vec::new();
let informational = Vec::new();
let events =
load_eval_pack_case_friction_events(case, base_dir, fixture_base_dir, fixtures_by_id)?;
let options = friction_suggestion_options(case, manifest);
let suggestions = generate_context_pack_suggestions(&events, &options);
for rubric_id in &case.rubrics {
let Some(rubric) = rubrics_by_id.get(rubric_id.as_str()) else {
failures.push(format!("case references unknown rubric '{rubric_id}'"));
continue;
};
apply_eval_pack_friction_rubric(rubric, &suggestions, &mut failures, &mut warnings);
}
if case.rubrics.is_empty() && suggestions.is_empty() {
failures.push("friction fixture produced no context-pack suggestions".to_string());
}
Ok(eval_pack_trial_report(
trial,
severity,
blocking,
"friction_events".to_string(),
String::new(),
eval_pack_case_friction_source_path(case, base_dir, fixture_base_dir, fixtures_by_id),
events.len(),
false,
0.0,
0.0,
failures,
warnings,
informational,
None,
))
}
#[allow(clippy::too_many_arguments)]
fn eval_pack_trial_report(
trial: usize,
severity: &str,
blocking: bool,
run_id: String,
workflow_id: String,
source_path: Option<String>,
stage_count: usize,
timed_out: bool,
wall_time_seconds: f64,
cost_usd: f64,
mut failures: Vec<String>,
mut warnings: Vec<String>,
mut informational: Vec<String>,
comparison: Option<RunDiffReport>,
) -> EvalPackTrialReport {
let verification = if failures.is_empty() { "PASS" } else { "FAIL" }.to_string();
let pass = failures.is_empty() || !blocking;
if !failures.is_empty() && !blocking {
if severity == "warning" {
warnings.append(&mut failures);
} else {
informational.append(&mut failures);
}
}
EvalPackTrialReport {
trial,
verification,
pass,
blocking,
run_id,
workflow_id,
source_path,
stage_count,
failures,
warnings,
informational,
comparison,
timed_out,
wall_time_seconds,
cost_usd,
}
}
#[allow(clippy::too_many_arguments)]
fn eval_ledger_row_from_trial(
case: &EvalPackCase,
case_id: &str,
split: Option<String>,
suite: &str,
model: &str,
commit: &str,
provenance: &EvalLedgerProvenance,
harness_config_fingerprint: &str,
trial: &EvalPackTrialReport,
) -> EvalLedgerRow {
let passes = usize::from(trial.verification == "PASS");
let fails = usize::from(trial.verification == "FAIL");
let skips = usize::from(trial.verification.eq_ignore_ascii_case("skip"));
let timeouts = usize::from(trial.timed_out);
EvalLedgerRow {
schema: EVAL_LEDGER_ROW_SCHEMA.to_string(),
suite: suite.to_string(),
model: model.to_string(),
split,
commit: commit.to_string(),
case_name: case_id.to_string(),
name: case_id.to_string(),
case_fingerprint: case.case_fingerprint.clone(),
harness_config_fingerprint: harness_config_fingerprint.to_string(),
trial: trial.trial,
trials: 1,
passes,
fails,
skips,
timeouts,
pass_rate: passes as f64,
status: trial.verification.clone(),
verification: trial.verification.clone(),
skipped: false,
wall_time_seconds: trial.wall_time_seconds,
cost_usd: trial.cost_usd,
mean_wall_time_seconds: trial.wall_time_seconds,
total_cost_usd: trial.cost_usd,
run_id: trial.run_id.clone(),
workflow_id: trial.workflow_id.clone(),
source_path: trial.source_path.clone(),
trial_report: Some(trial.clone()),
provenance: provenance.clone(),
metadata: case.metadata.clone(),
..EvalLedgerRow::default()
}
}
fn eval_pack_trial_report_from_ledger_row(
row: &EvalLedgerRow,
blocking: bool,
) -> EvalPackTrialReport {
if let Some(mut report) = row.trial_report.clone() {
report.trial = row.trial;
return report;
}
let mut failures = Vec::new();
let verification = if row.verification.is_empty() {
row.status.clone()
} else {
row.verification.clone()
};
if verification == "FAIL" {
failures.push("ledger row recorded a failed trial".to_string());
}
EvalPackTrialReport {
trial: row.trial,
verification: verification.clone(),
pass: verification != "FAIL" || !blocking,
blocking,
run_id: row.run_id.clone(),
workflow_id: row.workflow_id.clone(),
source_path: row.source_path.clone(),
stage_count: 0,
failures,
warnings: Vec::new(),
informational: Vec::new(),
comparison: None,
timed_out: row.timeouts > 0,
wall_time_seconds: row.wall_time_seconds,
cost_usd: row.cost_usd,
}
}
#[allow(clippy::too_many_arguments)]
fn eval_pack_case_report_from_trials(
case: &EvalPackCase,
case_id: String,
label: String,
severity: String,
split: Option<String>,
blocking: bool,
harness_config_fingerprint: String,
trials: Vec<EvalPackTrialReport>,
) -> EvalPackCaseReport {
let reliability = eval_pack_reliability_report(&trials);
let stats_row = eval_pack_stats_row(
case,
&case_id,
&harness_config_fingerprint,
split.clone(),
&trials,
&reliability,
);
let first = trials.first();
let pass = if blocking {
reliability.status == "all-pass"
} else {
true
};
let failures = prefixed_trial_messages(&trials, |trial| &trial.failures);
let warnings = prefixed_trial_messages(&trials, |trial| &trial.warnings);
let informational = prefixed_trial_messages(&trials, |trial| &trial.informational);
EvalPackCaseReport {
id: case_id,
label,
severity,
split,
case_fingerprint: case.case_fingerprint.clone(),
harness_config_fingerprint,
pass,
blocking,
run_id: first.map(|trial| trial.run_id.clone()).unwrap_or_default(),
workflow_id: first
.map(|trial| trial.workflow_id.clone())
.unwrap_or_default(),
source_path: first.and_then(|trial| trial.source_path.clone()),
stage_count: first.map(|trial| trial.stage_count).unwrap_or_default(),
trial_count: trials.len(),
total_stage_count: trials.iter().map(|trial| trial.stage_count).sum(),
reliability,
stats_row,
comparison: first.and_then(|trial| trial.comparison.clone()),
trials,
failures,
warnings,
informational,
}
}
fn prefixed_trial_messages<F>(trials: &[EvalPackTrialReport], messages: F) -> Vec<String>
where
F: Fn(&EvalPackTrialReport) -> &Vec<String>,
{
let include_prefix = trials.len() > 1;
let mut out = Vec::new();
for trial in trials {
for message in messages(trial) {
if include_prefix {
out.push(format!("trial {}: {message}", trial.trial));
} else {
out.push(message.clone());
}
}
}
out
}
fn eval_pack_reliability_report(trials: &[EvalPackTrialReport]) -> EvalPackReliabilityReport {
let passes = trials
.iter()
.filter(|trial| trial.verification == "PASS")
.count();
let fails = trials
.iter()
.filter(|trial| trial.verification == "FAIL")
.count();
let skips = trials
.iter()
.filter(|trial| trial.verification.eq_ignore_ascii_case("skip"))
.count();
let timeouts = trials.iter().filter(|trial| trial.timed_out).count();
let decided = passes + fails;
let majority = if passes > 0 && fails > 0 {
Some(if passes >= fails { "PASS" } else { "FAIL" }.to_string())
} else {
None
};
let status = if decided == 0 {
"no-decision"
} else if fails == 0 {
"all-pass"
} else if passes == 0 {
"all-fail"
} else {
"flaky"
};
EvalPackReliabilityReport {
status: status.to_string(),
trials: trials.len(),
passes,
fails,
skips,
timeouts,
decided,
pass_rate: if trials.is_empty() {
0.0
} else {
passes as f64 / trials.len() as f64
},
majority,
}
}
fn eval_pack_stats_row(
case: &EvalPackCase,
case_id: &str,
harness_config_fingerprint: &str,
split: Option<String>,
trials: &[EvalPackTrialReport],
reliability: &EvalPackReliabilityReport,
) -> EvalPackStatsRow {
let wall_times = trials
.iter()
.map(|trial| trial.wall_time_seconds)
.collect::<Vec<_>>();
let costs = trials
.iter()
.map(|trial| trial.cost_usd)
.collect::<Vec<_>>();
let group = case
.metadata
.get("group")
.or_else(|| case.metadata.get("language"))
.or_else(|| case.metadata.get("bucket"))
.and_then(|value| value.as_str())
.unwrap_or_default()
.to_string();
EvalPackStatsRow {
name: case_id.to_string(),
case_name: case_id.to_string(),
case_fingerprint: case.case_fingerprint.clone(),
harness_config_fingerprint: harness_config_fingerprint.to_string(),
group,
split,
trials: trials.len(),
passes: reliability.passes,
fails: reliability.fails,
skips: reliability.skips,
timeouts: reliability.timeouts,
pass_rate: reliability.pass_rate,
status: match reliability.status.as_str() {
"all-pass" => "PASS",
"all-fail" => "FAIL",
"flaky" => "FLAKY",
_ => "skip",
}
.to_string(),
majority: reliability.majority.clone(),
wall_time_seconds: mean(&wall_times),
cost_usd: costs.iter().sum(),
mean_wall_time_seconds: mean(&wall_times),
stdev_wall_time_seconds: stdev(&wall_times),
total_cost_usd: costs.iter().sum(),
}
}
fn eval_pack_stats_report(rows: &[EvalPackStatsRow]) -> EvalPackStatsReport {
EvalPackStatsReport {
macro_pass_at_1: macro_pass_at_1(rows),
reliability: eval_pack_reliability_breakdown(rows),
}
}
fn macro_pass_at_1(rows: &[EvalPackStatsRow]) -> f64 {
let decided = rows
.iter()
.filter(|row| row.passes + row.fails > 0)
.collect::<Vec<_>>();
if decided.is_empty() {
return 0.0;
}
decided.iter().map(|row| row.pass_rate).sum::<f64>() / decided.len() as f64
}
fn eval_pack_reliability_breakdown(rows: &[EvalPackStatsRow]) -> EvalPackReliabilityBreakdown {
let total_cases = rows.len();
let all_pass_cases = rows
.iter()
.filter(|row| row.passes > 0 && row.fails == 0)
.count();
let flaky_cases = rows
.iter()
.filter(|row| row.passes > 0 && row.fails > 0)
.count();
let all_fail_cases = rows
.iter()
.filter(|row| row.passes == 0 && row.fails > 0)
.count();
let no_decision_cases = rows
.iter()
.filter(|row| row.passes + row.fails == 0)
.count();
EvalPackReliabilityBreakdown {
all_pass_cases,
flaky_cases,
all_fail_cases,
no_decision_cases,
total_cases,
all_pass_fraction: rate(all_pass_cases, total_cases),
flaky_fraction: rate(flaky_cases, total_cases),
all_fail_fraction: rate(all_fail_cases, total_cases),
no_decision_fraction: rate(no_decision_cases, total_cases),
}
}
fn split_by_case_id(report: &EvalPackSplitValidationReport) -> BTreeMap<String, String> {
let mut out = BTreeMap::new();
for (partition, cases) in &report.partitions {
for case_id in cases {
out.insert(case_id.clone(), partition.clone());
}
}
out
}
fn mean(values: &[f64]) -> f64 {
if values.is_empty() {
return 0.0;
}
values.iter().sum::<f64>() / values.len() as f64
}
fn stdev(values: &[f64]) -> f64 {
if values.is_empty() {
return 0.0;
}
let mean = mean(values);
let variance = values
.iter()
.map(|value| {
let diff = value - mean;
diff * diff
})
.sum::<f64>()
/ values.len() as f64;
variance.sqrt()
}
fn rate(count: usize, denom: usize) -> f64 {
if denom == 0 {
0.0
} else {
count as f64 / denom as f64
}
}
fn eval_pack_case_severity(manifest: &EvalPackManifest, case: &EvalPackCase) -> String {
normalize_eval_pack_severity(
case.severity
.as_deref()
.or(case.thresholds.severity.as_deref())
.or(manifest.defaults.severity.as_deref())
.or(manifest.defaults.thresholds.severity.as_deref())
.unwrap_or("blocking"),
)
}
fn normalize_eval_pack_severity(value: &str) -> String {
match value.trim().to_ascii_lowercase().as_str() {
"warn" | "warning" => "warning".to_string(),
"info" | "informational" => "informational".to_string(),
_ => "blocking".to_string(),
}
}
fn load_eval_pack_case_run(
case: &EvalPackCase,
base_dir: Option<&Path>,
fixture_base_dir: Option<&Path>,
fixtures_by_id: &BTreeMap<&str, &EvalPackFixtureRef>,
) -> Result<RunRecord, VmError> {
if let Some(run_ref) = case.run.as_deref().or(case.run_path.as_deref()) {
if let Some(fixture) = fixtures_by_id.get(run_ref) {
return load_run_record_from_fixture_ref(fixture, fixture_base_dir);
}
return load_run_record(&resolve_manifest_path(base_dir, run_ref));
}
Err(VmError::Runtime(
"eval pack case is missing run or run_path".to_string(),
))
}
fn load_eval_pack_case_fixture(
case: &EvalPackCase,
base_dir: Option<&Path>,
fixture_base_dir: Option<&Path>,
fixtures_by_id: &BTreeMap<&str, &EvalPackFixtureRef>,
run: &RunRecord,
) -> Result<ReplayFixture, VmError> {
if let Some(fixture_ref) = case.fixture.as_deref().or(case.fixture_path.as_deref()) {
if let Some(fixture) = fixtures_by_id.get(fixture_ref) {
return load_replay_fixture_from_ref(fixture, fixture_base_dir);
}
return load_replay_fixture(&resolve_manifest_path(base_dir, fixture_ref));
}
Ok(run
.replay_fixture
.clone()
.unwrap_or_else(|| replay_fixture_from_run(run)))
}
fn eval_pack_case_source_path(
case: &EvalPackCase,
base_dir: Option<&Path>,
fixture_base_dir: Option<&Path>,
fixtures_by_id: &BTreeMap<&str, &EvalPackFixtureRef>,
) -> Option<String> {
let run_ref = case.run.as_deref().or(case.run_path.as_deref())?;
if let Some(fixture) = fixtures_by_id.get(run_ref) {
return fixture.path.as_ref().map(|path| {
resolve_manifest_path(fixture_base_dir, path)
.display()
.to_string()
});
}
Some(
resolve_manifest_path(base_dir, run_ref)
.display()
.to_string(),
)
}
fn load_eval_pack_case_friction_events(
case: &EvalPackCase,
base_dir: Option<&Path>,
fixture_base_dir: Option<&Path>,
fixtures_by_id: &BTreeMap<&str, &EvalPackFixtureRef>,
) -> Result<Vec<FrictionEvent>, VmError> {
let event_ref = case.friction_events.as_deref().ok_or_else(|| {
VmError::Runtime("eval pack friction case is missing friction_events".to_string())
})?;
if let Some(fixture) = fixtures_by_id.get(event_ref) {
return load_friction_events_from_fixture_ref(fixture, fixture_base_dir);
}
load_friction_events_from_path(&resolve_manifest_path(base_dir, event_ref))
}
fn load_friction_events_from_fixture_ref(
fixture: &EvalPackFixtureRef,
base_dir: Option<&Path>,
) -> Result<Vec<FrictionEvent>, VmError> {
if let Some(inline) = &fixture.inline {
return normalize_friction_events_json(inline.clone());
}
let path = fixture.path.as_deref().ok_or_else(|| {
VmError::Runtime(format!(
"fixture '{}' is missing path or inline friction events",
fixture.id
))
})?;
load_friction_events_from_path(&resolve_manifest_path(base_dir, path))
}
fn load_friction_events_from_path(path: &Path) -> Result<Vec<FrictionEvent>, VmError> {
let content = std::fs::read_to_string(path)
.map_err(|e| VmError::Runtime(format!("failed to read friction events fixture: {e}")))?;
let value: serde_json::Value = serde_json::from_str(&content)
.map_err(|e| VmError::Runtime(format!("failed to parse friction events fixture: {e}")))?;
normalize_friction_events_json(value)
}
fn eval_pack_case_friction_source_path(
case: &EvalPackCase,
base_dir: Option<&Path>,
fixture_base_dir: Option<&Path>,
fixtures_by_id: &BTreeMap<&str, &EvalPackFixtureRef>,
) -> Option<String> {
let event_ref = case.friction_events.as_deref()?;
if let Some(fixture) = fixtures_by_id.get(event_ref) {
return fixture.path.as_ref().map(|path| {
resolve_manifest_path(fixture_base_dir, path)
.display()
.to_string()
});
}
Some(
resolve_manifest_path(base_dir, event_ref)
.display()
.to_string(),
)
}
fn friction_suggestion_options(
case: &EvalPackCase,
manifest: &EvalPackManifest,
) -> ContextPackSuggestionOptions {
let min_occurrences = case
.metadata
.get("min_occurrences")
.or_else(|| manifest.metadata.get("min_occurrences"))
.and_then(|value| value.as_u64())
.unwrap_or(2) as usize;
let owner = case
.metadata
.get("owner")
.or_else(|| manifest.metadata.get("owner"))
.and_then(|value| value.as_str())
.map(str::to_string)
.or_else(|| {
manifest
.package
.as_ref()
.and_then(|package| package.name.clone())
});
ContextPackSuggestionOptions {
min_occurrences,
owner,
}
}
fn apply_eval_pack_thresholds(
run: &RunRecord,
thresholds: &super::types::EvalPackThresholds,
failures: &mut Vec<String>,
) {
if let Some(max_stage_count) = thresholds.max_stage_count {
if run.stages.len() > max_stage_count {
failures.push(format!(
"stage count {} exceeds threshold {}",
run.stages.len(),
max_stage_count
));
}
}
if let Some(max_latency_ms) = thresholds.max_latency_ms {
let actual = run
.usage
.as_ref()
.map(|usage| usage.total_duration_ms)
.unwrap_or_default();
if actual > max_latency_ms {
failures.push(format!(
"latency {actual}ms exceeds threshold {max_latency_ms}ms"
));
}
}
if let Some(max_cost_usd) = thresholds.max_cost_usd {
let actual = run
.usage
.as_ref()
.map(|usage| usage.total_cost)
.unwrap_or_default();
if actual > max_cost_usd {
failures.push(format!(
"cost ${actual:.6} exceeds threshold ${max_cost_usd:.6}"
));
}
}
if let Some(max_tokens) = thresholds.max_tokens {
let actual = run
.usage
.as_ref()
.map(|usage| usage.input_tokens + usage.output_tokens)
.unwrap_or_default();
if actual > max_tokens {
failures.push(format!(
"token count {actual} exceeds threshold {max_tokens}"
));
}
}
}
fn apply_eval_pack_rubric(
rubric: &EvalPackRubric,
run: &RunRecord,
failures: &mut Vec<String>,
warnings: &mut Vec<String>,
) {
match rubric.kind.as_str() {
"" | "deterministic" | "replay" | "budget" | "hitl" | "side-effect" => {
apply_eval_pack_thresholds(run, &rubric.thresholds, failures);
for assertion in &rubric.assertions {
apply_eval_pack_assertion(rubric, assertion, run, failures);
}
}
"llm-judge" | "llm_as_judge" | "judge" => {
let severity = normalize_eval_pack_severity(
rubric.thresholds.severity.as_deref().unwrap_or("blocking"),
);
let message = format!(
"rubric '{}' requires an external LLM judge and was not run locally",
rubric.id
);
if severity == "blocking" {
failures.push(message);
} else {
warnings.push(message);
}
}
other => warnings.push(format!(
"rubric '{}' has unknown kind '{}' and was not run locally",
rubric.id, other
)),
}
}
fn apply_eval_pack_friction_rubric(
rubric: &EvalPackRubric,
suggestions: &[super::super::ContextPackSuggestion],
failures: &mut Vec<String>,
warnings: &mut Vec<String>,
) {
match rubric.kind.as_str() {
"" | "deterministic" | "friction" | "context-pack-suggestion" => {
let mut expectations = Vec::new();
for assertion in &rubric.assertions {
match assertion.kind.as_str() {
"context-pack-suggestion" | "context_pack_suggestion" | "suggestion" => {
let expectation = context_pack_expectation_from_assertion(assertion);
expectations.push(expectation);
}
other => failures.push(format!(
"rubric '{}' has unsupported friction assertion kind '{}'",
rubric.id, other
)),
}
}
failures.extend(evaluate_context_pack_suggestion_expectations(
suggestions,
&expectations,
));
}
other => warnings.push(format!(
"rubric '{}' has unknown friction kind '{}' and was not run locally",
rubric.id, other
)),
}
}
fn context_pack_expectation_from_assertion(
assertion: &EvalPackAssertion,
) -> ContextPackSuggestionExpectation {
let expected = assertion
.expected
.as_ref()
.and_then(|value| value.as_object());
let expected_string = assertion.expected.as_ref().and_then(|value| value.as_str());
ContextPackSuggestionExpectation {
min_suggestions: expected
.and_then(|map| map.get("min_suggestions"))
.and_then(|value| value.as_u64())
.map(|value| value as usize),
recommended_artifact: expected
.and_then(|map| map.get("recommended_artifact"))
.and_then(|value| value.as_str())
.map(str::to_string)
.or_else(|| expected_string.map(str::to_string)),
title_contains: assertion.contains.clone().or_else(|| {
expected
.and_then(|map| map.get("title_contains"))
.and_then(|value| value.as_str())
.map(str::to_string)
}),
manifest_name_contains: expected
.and_then(|map| map.get("manifest_name_contains"))
.and_then(|value| value.as_str())
.map(str::to_string),
required_capability: expected
.and_then(|map| map.get("required_capability"))
.and_then(|value| value.as_str())
.map(str::to_string),
required_output_slot: expected
.and_then(|map| map.get("required_output_slot"))
.and_then(|value| value.as_str())
.map(str::to_string),
}
}
fn apply_eval_pack_assertion(
rubric: &EvalPackRubric,
assertion: &EvalPackAssertion,
run: &RunRecord,
failures: &mut Vec<String>,
) {
match assertion.kind.as_str() {
"run-status" | "run_status" | "status" => {
let expected = assertion.expected.as_ref().and_then(|value| value.as_str());
if let Some(expected) = expected {
if run.status != expected {
failures.push(format!(
"rubric '{}' expected run status {}, got {}",
rubric.id, expected, run.status
));
}
}
}
"stage-status" | "stage_status" => {
let Some(stage_id) = assertion.stage.as_deref() else {
failures.push(format!(
"rubric '{}' stage-status assertion missing stage",
rubric.id
));
return;
};
let expected = assertion.expected.as_ref().and_then(|value| value.as_str());
let Some(expected) = expected else {
failures.push(format!(
"rubric '{}' stage-status assertion missing expected string",
rubric.id
));
return;
};
match run.stages.iter().find(|stage| stage.node_id == stage_id) {
Some(stage) if stage.status == expected => {}
Some(stage) => failures.push(format!(
"rubric '{}' expected stage {} status {}, got {}",
rubric.id, stage_id, expected, stage.status
)),
None => failures.push(format!(
"rubric '{}' expected stage {} to exist",
rubric.id, stage_id
)),
}
}
"visible-text-contains" | "visible_text_contains" => {
let Some(needle) = assertion.contains.as_deref() else {
failures.push(format!(
"rubric '{}' visible-text assertion missing contains",
rubric.id
));
return;
};
let matched = match assertion.stage.as_deref() {
Some(stage_id) => run
.stages
.iter()
.find(|stage| stage.node_id == stage_id)
.and_then(|stage| stage.visible_text.as_deref())
.is_some_and(|text| text.contains(needle)),
None => run
.stages
.iter()
.filter_map(|stage| stage.visible_text.as_deref())
.any(|text| text.contains(needle)),
};
if !matched {
failures.push(format!(
"rubric '{}' expected visible text to contain {:?}",
rubric.id, needle
));
}
}
"hitl-question-contains" | "hitl_question_contains" => {
let Some(needle) = assertion.contains.as_deref() else {
failures.push(format!(
"rubric '{}' HITL assertion missing contains",
rubric.id
));
return;
};
if !run
.hitl_questions
.iter()
.any(|question| question.prompt.contains(needle))
{
failures.push(format!(
"rubric '{}' expected HITL question to contain {:?}",
rubric.id, needle
));
}
}
"" => {}
other => failures.push(format!(
"rubric '{}' has unsupported assertion kind '{}'",
rubric.id, other
)),
}
}
pub fn replay_fixture_from_run(run: &RunRecord) -> ReplayFixture {
ReplayFixture {
type_name: "replay_fixture".to_string(),
id: new_id("fixture"),
source_run_id: run.id.clone(),
workflow_id: run.workflow_id.clone(),
workflow_name: run.workflow_name.clone(),
created_at: now_rfc3339(),
eval_kind: Some("replay".to_string()),
clarifying_question: None,
expected_status: run.status.clone(),
stage_assertions: run
.stages
.iter()
.map(|stage| ReplayStageAssertion {
node_id: stage.node_id.clone(),
expected_status: stage.status.clone(),
expected_outcome: stage.outcome.clone(),
expected_branch: stage.branch.clone(),
required_artifact_kinds: stage
.artifacts
.iter()
.map(|artifact| artifact.kind.clone())
.collect(),
visible_text_contains: stage
.visible_text
.as_ref()
.filter(|text| !text.is_empty())
.map(|text| text.chars().take(80).collect()),
})
.collect(),
}
}
pub fn evaluate_run_against_fixture(run: &RunRecord, fixture: &ReplayFixture) -> ReplayEvalReport {
if fixture.eval_kind.as_deref() == Some("clarifying_question") {
return evaluate_clarifying_question(run, fixture);
}
let mut failures = Vec::new();
if run.status != fixture.expected_status {
failures.push(format!(
"run status mismatch: expected {}, got {}",
fixture.expected_status, run.status
));
}
let stages_by_id: BTreeMap<&str, &RunStageRecord> =
run.stages.iter().map(|s| (s.node_id.as_str(), s)).collect();
for assertion in &fixture.stage_assertions {
let Some(stage) = stages_by_id.get(assertion.node_id.as_str()) else {
failures.push(format!("missing stage {}", assertion.node_id));
continue;
};
if stage.status != assertion.expected_status {
failures.push(format!(
"stage {} status mismatch: expected {}, got {}",
assertion.node_id, assertion.expected_status, stage.status
));
}
if stage.outcome != assertion.expected_outcome {
failures.push(format!(
"stage {} outcome mismatch: expected {}, got {}",
assertion.node_id, assertion.expected_outcome, stage.outcome
));
}
if stage.branch != assertion.expected_branch {
failures.push(format!(
"stage {} branch mismatch: expected {:?}, got {:?}",
assertion.node_id, assertion.expected_branch, stage.branch
));
}
for required_kind in &assertion.required_artifact_kinds {
if !stage
.artifacts
.iter()
.any(|artifact| &artifact.kind == required_kind)
{
failures.push(format!(
"stage {} missing artifact kind {}",
assertion.node_id, required_kind
));
}
}
if let Some(snippet) = &assertion.visible_text_contains {
let actual = stage.visible_text.clone().unwrap_or_default();
if !actual.contains(snippet) {
failures.push(format!(
"stage {} visible text does not contain expected snippet {:?}",
assertion.node_id, snippet
));
}
}
}
ReplayEvalReport {
pass: failures.is_empty(),
failures,
stage_count: run.stages.len(),
}
}
fn evaluate_clarifying_question(run: &RunRecord, fixture: &ReplayFixture) -> ReplayEvalReport {
let mut failures = Vec::new();
let spec = fixture.clarifying_question.clone().unwrap_or_default();
let min_questions = clarifying_min_questions(&spec);
let max_questions = clarifying_max_questions(&spec);
let questions = &run.hitl_questions;
if run.status != fixture.expected_status {
failures.push(format!(
"run status mismatch: expected {}, got {}",
fixture.expected_status, run.status
));
}
if questions.len() < min_questions {
failures.push(format!(
"expected at least {min_questions} clarifying question(s), got {}",
questions.len()
));
}
if questions.len() > max_questions {
failures.push(format!(
"expected at most {max_questions} clarifying question(s), got {}",
questions.len()
));
}
let normalized_expected = spec
.expected_question
.as_deref()
.map(normalize_question_text);
let normalized_accepted = spec
.accepted_questions
.iter()
.map(|question| normalize_question_text(question))
.collect::<Vec<_>>();
let required_terms = spec
.required_terms
.iter()
.map(|term| normalize_question_text(term))
.collect::<Vec<_>>();
let forbidden_terms = spec
.forbidden_terms
.iter()
.map(|term| normalize_question_text(term))
.collect::<Vec<_>>();
let matched = questions.iter().any(|question| {
let normalized = normalize_question_text(&question.prompt);
let matches_expected = normalized_expected
.as_ref()
.is_none_or(|expected| &normalized == expected)
&& (normalized_accepted.is_empty()
|| normalized_accepted
.iter()
.any(|candidate| candidate == &normalized));
let has_required_terms = required_terms
.iter()
.all(|term| normalized.contains(term.as_str()));
let avoids_forbidden_terms = forbidden_terms
.iter()
.all(|term| !normalized.contains(term.as_str()));
matches_expected && has_required_terms && avoids_forbidden_terms
});
if !questions.is_empty()
&& (!normalized_accepted.is_empty()
|| normalized_expected.is_some()
|| !required_terms.is_empty()
|| !forbidden_terms.is_empty())
&& !matched
{
failures.push(format!(
"no clarifying question matched fixture; actual questions: {}",
questions
.iter()
.map(|question| format!("{:?}", question.prompt))
.collect::<Vec<_>>()
.join(", ")
));
}
ReplayEvalReport {
pass: failures.is_empty(),
failures,
stage_count: run.stages.len(),
}
}
pub fn evaluate_run_suite(
cases: Vec<(RunRecord, ReplayFixture, Option<String>)>,
) -> ReplayEvalSuiteReport {
let mut reports = Vec::new();
for (run, fixture, source_path) in cases {
let report = evaluate_run_against_fixture(&run, &fixture);
reports.push(ReplayEvalCaseReport {
run_id: run.id.clone(),
workflow_id: run.workflow_id.clone(),
label: None,
pass: report.pass,
failures: report.failures,
stage_count: report.stage_count,
source_path,
comparison: None,
});
}
let total = reports.len();
let passed = reports.iter().filter(|report| report.pass).count();
let failed = total.saturating_sub(passed);
ReplayEvalSuiteReport {
pass: failed == 0,
total,
passed,
failed,
cases: reports,
}
}