mod logs;
mod recommendations;
mod snapshot;
#[cfg(test)]
mod tests;
use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Serialize};
use crate::{PoolConfig, PoolError, RubricOptions, RubricPool, RubricVerdict};
use logs::copy_logs_from_config;
use recommendations::{aggregate_status, classify_recommendations};
pub use snapshot::{AssetChange, AssetSnapshot, SelectionMode, diff_snapshots, select_changed};
const REPORT_SCHEMA_VERSION: u32 = 1;
#[derive(Clone, Copy, Debug, Deserialize, Serialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum AggregateStatus {
Error,
Fail,
Pass,
Skipped,
}
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
pub struct BatchRubricReport {
pub schema_version: u32,
pub aggregate_status: AggregateStatus,
pub started_at: String,
pub finished_at: String,
pub workers: usize,
pub options: RubricOptions,
pub logs: Vec<String>,
pub log_capture_error: Option<String>,
pub assets: Vec<AssetRubricReport>,
pub recommendations: Vec<IssueRecommendation>,
}
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
pub struct AssetRubricReport {
pub path: String,
pub change: String,
pub selected: bool,
pub result: AssetRubricResult,
}
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
#[serde(tag = "status", rename_all = "snake_case")]
pub enum AssetRubricResult {
Pass {
reason: String,
anomalies: Vec<String>,
},
Fail {
reason: String,
anomalies: Vec<String>,
},
Error {
message: String,
},
NotEvaluatedAfterError {
root_error: String,
retry_hint: String,
},
Skipped {
reason: String,
},
}
#[derive(Clone, Copy, Debug, Deserialize, Serialize, PartialEq, Eq, PartialOrd, Ord)]
#[serde(rename_all = "snake_case")]
pub enum RecommendationSeverity {
Low,
Medium,
High,
}
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
pub struct IssueRecommendation {
pub id: String,
pub class: String,
pub severity: RecommendationSeverity,
pub affected_assets: Vec<String>,
pub evidence: Vec<String>,
pub suggested_fix: String,
pub candidate_modules: Vec<String>,
}
#[derive(Debug)]
pub struct IssueClassificationInput<'a> {
pub asset: &'a AssetRubricReport,
pub issue_text: &'a str,
}
pub trait IssueClassifier {
fn classify(&self, input: IssueClassificationInput<'_>) -> Vec<IssueRecommendation>;
}
pub struct BatchRubricConfig<'a> {
pub pool: PoolConfig,
pub question: String,
pub selection_mode: SelectionMode,
pub classifier: Option<&'a dyn IssueClassifier>,
}
impl std::fmt::Debug for BatchRubricConfig<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BatchRubricConfig")
.field("pool", &self.pool)
.field("question", &self.question)
.field("selection_mode", &self.selection_mode)
.field("classifier", &self.classifier.map(|_| "<classifier>"))
.finish()
}
}
#[derive(Debug)]
pub struct BatchRubricRun<'a> {
config: BatchRubricConfig<'a>,
}
impl<'a> BatchRubricRun<'a> {
#[must_use]
pub fn new(config: BatchRubricConfig<'a>) -> Self {
Self { config }
}
#[must_use]
pub fn run(&self, changes: &[AssetChange]) -> BatchRubricReport {
self.run_with_evaluator(changes, None)
}
fn run_with_evaluator(
&self,
changes: &[AssetChange],
evaluator: Option<&dyn BatchEvaluator>,
) -> BatchRubricReport {
let started_at = unix_timestamp();
let selected = select_changed(changes, self.config.selection_mode);
let mut assets = Vec::with_capacity(changes.len());
let selected_evaluation = if selected.is_empty() {
SelectedEvaluation::default()
} else if let Some(evaluator) = evaluator {
evaluate_selected(evaluator, changes, &selected, &self.config.question)
} else {
match RubricPool::new(self.config.pool.clone()) {
Ok(pool) => {
let evaluation =
evaluate_selected(&pool, changes, &selected, &self.config.question);
if evaluation.aborted {
drop(pool);
} else {
let _stats = pool.shutdown();
}
evaluation
}
Err(error) => selected
.iter()
.map(|path| {
AssetRubricReport::selected(
path,
status_for_selected(path, changes),
AssetRubricResult::Error {
message: format!("start visual-rubric worker pool: {error}"),
},
)
})
.collect::<Vec<_>>()
.into(),
}
};
assets.extend(selected_evaluation.reports);
assets.extend(skipped_asset_reports(changes, self.config.selection_mode));
assets.sort_by(|left, right| left.path.cmp(&right.path));
let (logs, log_capture_error) =
match copy_logs_from_config(self.config.pool.log_capture.as_ref()) {
Ok(logs) => (logs, None),
Err(error) => (
Vec::new(),
Some(format!(
"copy configured ACP logs into batch report: {error}"
)),
),
};
let recommendations = classify_recommendations(self.config.classifier, &assets);
let aggregate_status = aggregate_status(&assets);
BatchRubricReport {
schema_version: REPORT_SCHEMA_VERSION,
aggregate_status,
started_at,
finished_at: unix_timestamp(),
workers: self.config.pool.workers,
options: self.config.pool.default_options.clone(),
logs,
log_capture_error,
assets,
recommendations,
}
}
}
trait BatchEvaluator {
fn submit_asset(&self, png_path: &Path, question: &str) -> Result<RubricVerdict, PoolError>;
}
impl BatchEvaluator for RubricPool {
fn submit_asset(&self, png_path: &Path, question: &str) -> Result<RubricVerdict, PoolError> {
self.submit(png_path, question, RubricOptions::default())
}
}
#[derive(Default)]
struct SelectedEvaluation {
reports: Vec<AssetRubricReport>,
aborted: bool,
}
impl From<Vec<AssetRubricReport>> for SelectedEvaluation {
fn from(reports: Vec<AssetRubricReport>) -> Self {
Self {
reports,
aborted: false,
}
}
}
fn evaluate_selected(
evaluator: &dyn BatchEvaluator,
changes: &[AssetChange],
selected: &[PathBuf],
question: &str,
) -> SelectedEvaluation {
let mut reports = Vec::with_capacity(selected.len());
let mut abort_message = None;
let mut aborted = false;
for (index, path) in selected.iter().enumerate() {
let Some(message) = abort_message.as_ref() else {
let result = match evaluator.submit_asset(path, question) {
Ok(verdict) => result_from_verdict(verdict),
Err(error) => {
let message = error.to_string();
if should_abort_after_error(&error) {
aborted = true;
abort_message = Some(message.clone());
}
AssetRubricResult::Error { message }
}
};
reports.push(AssetRubricReport::selected(
path,
status_for_selected(path, changes),
result,
));
continue;
};
reports.extend(selected[index..].iter().map(|remaining| {
AssetRubricReport::selected(
remaining,
status_for_selected(remaining, changes),
AssetRubricResult::NotEvaluatedAfterError {
root_error: message.clone(),
retry_hint: retry_hint_after_pool_error(message).to_owned(),
},
)
}));
break;
}
SelectedEvaluation { reports, aborted }
}
fn result_from_verdict(verdict: RubricVerdict) -> AssetRubricResult {
if verdict.verdict.is_pass() {
AssetRubricResult::Pass {
reason: verdict.reason,
anomalies: verdict.anomalies,
}
} else {
AssetRubricResult::Fail {
reason: verdict.reason,
anomalies: verdict.anomalies,
}
}
}
fn should_abort_after_error(error: &PoolError) -> bool {
matches!(
error,
PoolError::Timeout { .. }
| PoolError::QuotaExceeded
| PoolError::WorkerCrashed { .. }
| PoolError::NoLiveWorkers
| PoolError::Closed
)
}
fn retry_hint_after_pool_error(message: &str) -> &'static str {
if message.contains("timed out") || message.contains("timeout") {
"Asset was not evaluated after an ACP timeout; inspect captured logs and rerun with fewer workers or a smaller asset scope."
} else if message.contains("quota") {
"Asset was not evaluated after an ACP quota error; inspect captured logs, wait for quota recovery or switch credentials, then rerun the batch."
} else {
"Asset was not evaluated after an ACP worker error; inspect captured logs and rerun with fewer workers or a smaller asset scope."
}
}
fn status_for_selected(path: &Path, changes: &[AssetChange]) -> &'static str {
changes
.iter()
.find_map(|change| (change.path() == path).then_some(change.status()))
.map_or("selected", |status| status)
}
fn skipped_asset_reports(
changes: &[AssetChange],
selection_mode: SelectionMode,
) -> Vec<AssetRubricReport> {
let mut reports = Vec::with_capacity(changes.len());
for change in changes {
match (change, selection_mode) {
(AssetChange::Unchanged(path), SelectionMode::ChangedOnly) => {
reports.push(AssetRubricReport::skipped(
path,
"unchanged",
"asset content did not change during this command",
));
}
(AssetChange::Deleted(path), _) => {
reports.push(AssetRubricReport::skipped(
path,
"deleted",
"asset was deleted before evaluation",
));
}
(AssetChange::Added(_) | AssetChange::Changed(_) | AssetChange::Unchanged(_), _) => {}
}
}
reports
}
impl AssetRubricReport {
fn selected(path: &Path, change: &str, result: AssetRubricResult) -> Self {
Self {
path: path.to_string_lossy().into_owned(),
change: change.to_owned(),
selected: true,
result,
}
}
fn skipped(path: &Path, change: &str, reason: &str) -> Self {
Self {
path: path.to_string_lossy().into_owned(),
change: change.to_owned(),
selected: false,
result: AssetRubricResult::Skipped {
reason: reason.to_owned(),
},
}
}
}
fn unix_timestamp() -> String {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_secs().to_string())
.unwrap_or_else(|_| "0".to_owned())
}