Skip to main content

visual_rubric/batch/
mod.rs

1mod logs;
2mod recommendations;
3mod snapshot;
4
5#[cfg(test)]
6mod tests;
7
8use std::path::{Path, PathBuf};
9use std::time::{SystemTime, UNIX_EPOCH};
10
11use serde::{Deserialize, Serialize};
12
13use crate::{PoolConfig, PoolError, RubricOptions, RubricPool, RubricVerdict};
14
15use logs::copy_logs_from_config;
16use recommendations::{aggregate_status, classify_recommendations};
17
18pub use snapshot::{AssetChange, AssetSnapshot, SelectionMode, diff_snapshots, select_changed};
19
20const REPORT_SCHEMA_VERSION: u32 = 1;
21
22/// Overall status for a batch rubric report.
23#[derive(Clone, Copy, Debug, Deserialize, Serialize, PartialEq, Eq)]
24#[serde(rename_all = "snake_case")]
25pub enum AggregateStatus {
26    /// At least one selected asset had an evaluation error or was not evaluated after a pool error.
27    Error,
28    /// At least one selected asset failed the rubric.
29    Fail,
30    /// At least one selected asset passed and no selected asset failed or errored.
31    Pass,
32    /// No asset was selected for evaluation.
33    Skipped,
34}
35
36/// Serializable batch rubric report.
37#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
38pub struct BatchRubricReport {
39    /// Version of this JSON report schema.
40    pub schema_version: u32,
41    /// Aggregate status computed as `error > fail > pass > skipped`.
42    pub aggregate_status: AggregateStatus,
43    /// UNIX timestamp in seconds when evaluation started.
44    pub started_at: String,
45    /// UNIX timestamp in seconds when evaluation finished.
46    pub finished_at: String,
47    /// Worker count requested for the batch.
48    pub workers: usize,
49    /// Effective default options supplied to the pool.
50    pub options: RubricOptions,
51    /// Copied ACP log paths.
52    pub logs: Vec<String>,
53    /// Log capture failure, when ACP logs were requested but could not be copied.
54    pub log_capture_error: Option<String>,
55    /// Per-asset results, including skipped unchanged/deleted assets.
56    pub assets: Vec<AssetRubricReport>,
57    /// Optional caller-classified recommendations derived from failed/error assets.
58    pub recommendations: Vec<IssueRecommendation>,
59}
60
61/// Per-asset report item.
62#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
63pub struct AssetRubricReport {
64    /// Asset path, using caller-provided path representation.
65    pub path: String,
66    /// Snapshot change status.
67    pub change: String,
68    /// Whether this asset was submitted to the rubric evaluator.
69    pub selected: bool,
70    /// Rubric evaluation result or skipped reason.
71    pub result: AssetRubricResult,
72}
73
74/// Per-asset rubric result.
75#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
76#[serde(tag = "status", rename_all = "snake_case")]
77pub enum AssetRubricResult {
78    /// Rubric passed.
79    Pass {
80        /// Verdict reason.
81        reason: String,
82        /// Reported anomalies.
83        anomalies: Vec<String>,
84    },
85    /// Rubric failed.
86    Fail {
87        /// Verdict reason.
88        reason: String,
89        /// Reported anomalies.
90        anomalies: Vec<String>,
91    },
92    /// Rubric evaluation errored.
93    Error {
94        /// Error message.
95        message: String,
96    },
97    /// Asset was selected but not evaluated after a fatal pool-level error.
98    NotEvaluatedAfterError {
99        /// Root error that aborted the remaining batch.
100        root_error: String,
101        /// Suggested retry action.
102        retry_hint: String,
103    },
104    /// Asset was not selected for evaluation.
105    Skipped {
106        /// Skip reason.
107        reason: String,
108    },
109}
110
111/// Severity for a caller-provided recommendation.
112#[derive(Clone, Copy, Debug, Deserialize, Serialize, PartialEq, Eq, PartialOrd, Ord)]
113#[serde(rename_all = "snake_case")]
114pub enum RecommendationSeverity {
115    /// Low severity.
116    Low,
117    /// Medium severity.
118    Medium,
119    /// High severity.
120    High,
121}
122
123/// Caller-provided issue recommendation.
124#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
125pub struct IssueRecommendation {
126    /// Stable recommendation id.
127    pub id: String,
128    /// Machine-readable recommendation class.
129    pub class: String,
130    /// Recommendation severity.
131    pub severity: RecommendationSeverity,
132    /// Affected asset paths.
133    pub affected_assets: Vec<String>,
134    /// Short evidence snippets.
135    pub evidence: Vec<String>,
136    /// Suggested generic fix.
137    pub suggested_fix: String,
138    /// Candidate modules or subsystems, if known to the caller.
139    pub candidate_modules: Vec<String>,
140}
141
142/// Input passed to caller-provided issue classifiers.
143#[derive(Debug)]
144pub struct IssueClassificationInput<'a> {
145    /// Failed or errored asset report.
146    pub asset: &'a AssetRubricReport,
147    /// Combined failure text available for classification.
148    pub issue_text: &'a str,
149}
150
151/// Optional caller hook for mapping failed/error assets to reusable recommendations.
152pub trait IssueClassifier {
153    /// Classifies one failed/error asset.
154    fn classify(&self, input: IssueClassificationInput<'_>) -> Vec<IssueRecommendation>;
155}
156
157/// Batch rubric runner configuration.
158pub struct BatchRubricConfig<'a> {
159    /// Pool configuration.
160    pub pool: PoolConfig,
161    /// Question sent for every selected asset.
162    pub question: String,
163    /// Selection mode for unchanged assets.
164    pub selection_mode: SelectionMode,
165    /// Optional issue classifier.
166    pub classifier: Option<&'a dyn IssueClassifier>,
167}
168
169impl std::fmt::Debug for BatchRubricConfig<'_> {
170    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171        f.debug_struct("BatchRubricConfig")
172            .field("pool", &self.pool)
173            .field("question", &self.question)
174            .field("selection_mode", &self.selection_mode)
175            .field("classifier", &self.classifier.map(|_| "<classifier>"))
176            .finish()
177    }
178}
179
180/// Generic batch runner around [`RubricPool`].
181#[derive(Debug)]
182pub struct BatchRubricRun<'a> {
183    config: BatchRubricConfig<'a>,
184}
185
186impl<'a> BatchRubricRun<'a> {
187    /// Creates a batch runner.
188    #[must_use]
189    pub fn new(config: BatchRubricConfig<'a>) -> Self {
190        Self { config }
191    }
192
193    /// Evaluates selected assets from a snapshot diff.
194    ///
195    /// Pool startup and per-asset failures are represented inside the returned
196    /// report so callers can serialize the report before deciding whether to
197    /// fail their own command.
198    #[must_use]
199    pub fn run(&self, changes: &[AssetChange]) -> BatchRubricReport {
200        self.run_with_evaluator(changes, None)
201    }
202
203    fn run_with_evaluator(
204        &self,
205        changes: &[AssetChange],
206        evaluator: Option<&dyn BatchEvaluator>,
207    ) -> BatchRubricReport {
208        let started_at = unix_timestamp();
209        let selected = select_changed(changes, self.config.selection_mode);
210        let mut assets = Vec::with_capacity(changes.len());
211        let selected_evaluation = if selected.is_empty() {
212            SelectedEvaluation::default()
213        } else if let Some(evaluator) = evaluator {
214            evaluate_selected(evaluator, changes, &selected, &self.config.question)
215        } else {
216            match RubricPool::new(self.config.pool.clone()) {
217                Ok(pool) => {
218                    let evaluation =
219                        evaluate_selected(&pool, changes, &selected, &self.config.question);
220                    if evaluation.aborted {
221                        drop(pool);
222                    } else {
223                        let _stats = pool.shutdown();
224                    }
225                    evaluation
226                }
227                Err(error) => selected
228                    .iter()
229                    .map(|path| {
230                        AssetRubricReport::selected(
231                            path,
232                            status_for_selected(path, changes),
233                            AssetRubricResult::Error {
234                                message: format!("start visual-rubric worker pool: {error}"),
235                            },
236                        )
237                    })
238                    .collect::<Vec<_>>()
239                    .into(),
240            }
241        };
242        assets.extend(selected_evaluation.reports);
243        assets.extend(skipped_asset_reports(changes, self.config.selection_mode));
244        assets.sort_by(|left, right| left.path.cmp(&right.path));
245        let (logs, log_capture_error) =
246            match copy_logs_from_config(self.config.pool.log_capture.as_ref()) {
247                Ok(logs) => (logs, None),
248                Err(error) => (
249                    Vec::new(),
250                    Some(format!(
251                        "copy configured ACP logs into batch report: {error}"
252                    )),
253                ),
254            };
255        let recommendations = classify_recommendations(self.config.classifier, &assets);
256        let aggregate_status = aggregate_status(&assets);
257        BatchRubricReport {
258            schema_version: REPORT_SCHEMA_VERSION,
259            aggregate_status,
260            started_at,
261            finished_at: unix_timestamp(),
262            workers: self.config.pool.workers,
263            options: self.config.pool.default_options.clone(),
264            logs,
265            log_capture_error,
266            assets,
267            recommendations,
268        }
269    }
270}
271
272trait BatchEvaluator {
273    fn submit_asset(&self, png_path: &Path, question: &str) -> Result<RubricVerdict, PoolError>;
274}
275
276impl BatchEvaluator for RubricPool {
277    fn submit_asset(&self, png_path: &Path, question: &str) -> Result<RubricVerdict, PoolError> {
278        self.submit(png_path, question, RubricOptions::default())
279    }
280}
281
282#[derive(Default)]
283struct SelectedEvaluation {
284    reports: Vec<AssetRubricReport>,
285    aborted: bool,
286}
287
288impl From<Vec<AssetRubricReport>> for SelectedEvaluation {
289    fn from(reports: Vec<AssetRubricReport>) -> Self {
290        Self {
291            reports,
292            aborted: false,
293        }
294    }
295}
296
297fn evaluate_selected(
298    evaluator: &dyn BatchEvaluator,
299    changes: &[AssetChange],
300    selected: &[PathBuf],
301    question: &str,
302) -> SelectedEvaluation {
303    let mut reports = Vec::with_capacity(selected.len());
304    let mut abort_message = None;
305    let mut aborted = false;
306    for (index, path) in selected.iter().enumerate() {
307        let Some(message) = abort_message.as_ref() else {
308            let result = match evaluator.submit_asset(path, question) {
309                Ok(verdict) => result_from_verdict(verdict),
310                Err(error) => {
311                    let message = error.to_string();
312                    if should_abort_after_error(&error) {
313                        aborted = true;
314                        abort_message = Some(message.clone());
315                    }
316                    AssetRubricResult::Error { message }
317                }
318            };
319            reports.push(AssetRubricReport::selected(
320                path,
321                status_for_selected(path, changes),
322                result,
323            ));
324            continue;
325        };
326
327        reports.extend(selected[index..].iter().map(|remaining| {
328            AssetRubricReport::selected(
329                remaining,
330                status_for_selected(remaining, changes),
331                AssetRubricResult::NotEvaluatedAfterError {
332                    root_error: message.clone(),
333                    retry_hint: retry_hint_after_pool_error(message).to_owned(),
334                },
335            )
336        }));
337        break;
338    }
339    SelectedEvaluation { reports, aborted }
340}
341
342fn result_from_verdict(verdict: RubricVerdict) -> AssetRubricResult {
343    if verdict.verdict.is_pass() {
344        AssetRubricResult::Pass {
345            reason: verdict.reason,
346            anomalies: verdict.anomalies,
347        }
348    } else {
349        AssetRubricResult::Fail {
350            reason: verdict.reason,
351            anomalies: verdict.anomalies,
352        }
353    }
354}
355
356fn should_abort_after_error(error: &PoolError) -> bool {
357    matches!(
358        error,
359        PoolError::Timeout { .. }
360            | PoolError::QuotaExceeded
361            | PoolError::WorkerCrashed { .. }
362            | PoolError::NoLiveWorkers
363            | PoolError::Closed
364    )
365}
366
367fn retry_hint_after_pool_error(message: &str) -> &'static str {
368    if message.contains("timed out") || message.contains("timeout") {
369        "Asset was not evaluated after an ACP timeout; inspect captured logs and rerun with fewer workers or a smaller asset scope."
370    } else if message.contains("quota") {
371        "Asset was not evaluated after an ACP quota error; inspect captured logs, wait for quota recovery or switch credentials, then rerun the batch."
372    } else {
373        "Asset was not evaluated after an ACP worker error; inspect captured logs and rerun with fewer workers or a smaller asset scope."
374    }
375}
376
377fn status_for_selected(path: &Path, changes: &[AssetChange]) -> &'static str {
378    changes
379        .iter()
380        .find_map(|change| (change.path() == path).then_some(change.status()))
381        .map_or("selected", |status| status)
382}
383
384fn skipped_asset_reports(
385    changes: &[AssetChange],
386    selection_mode: SelectionMode,
387) -> Vec<AssetRubricReport> {
388    let mut reports = Vec::with_capacity(changes.len());
389    for change in changes {
390        match (change, selection_mode) {
391            (AssetChange::Unchanged(path), SelectionMode::ChangedOnly) => {
392                reports.push(AssetRubricReport::skipped(
393                    path,
394                    "unchanged",
395                    "asset content did not change during this command",
396                ));
397            }
398            (AssetChange::Deleted(path), _) => {
399                reports.push(AssetRubricReport::skipped(
400                    path,
401                    "deleted",
402                    "asset was deleted before evaluation",
403                ));
404            }
405            (AssetChange::Added(_) | AssetChange::Changed(_) | AssetChange::Unchanged(_), _) => {}
406        }
407    }
408    reports
409}
410
411impl AssetRubricReport {
412    fn selected(path: &Path, change: &str, result: AssetRubricResult) -> Self {
413        Self {
414            path: path.to_string_lossy().into_owned(),
415            change: change.to_owned(),
416            selected: true,
417            result,
418        }
419    }
420
421    fn skipped(path: &Path, change: &str, reason: &str) -> Self {
422        Self {
423            path: path.to_string_lossy().into_owned(),
424            change: change.to_owned(),
425            selected: false,
426            result: AssetRubricResult::Skipped {
427                reason: reason.to_owned(),
428            },
429        }
430    }
431}
432
433fn unix_timestamp() -> String {
434    SystemTime::now()
435        .duration_since(UNIX_EPOCH)
436        .map(|duration| duration.as_secs().to_string())
437        .unwrap_or_else(|_| "0".to_owned())
438}