Skip to main content

forge_pipeline/
driver.rs

1//! Pipeline driver.
2//!
3//! Run a parsed IR through a chain of transformer plugins, then a single
4//! generator plugin, collecting diagnostics and respecting the halt-on-error
5//! policy.
6//!
7//! The driver does **not** call the parser: it accepts an already-parsed
8//! [`forge_ir::Ir`]. This keeps `forge-pipeline` independent of the spec
9//! parser, so it composes cleanly in tests and in the test harness.
10
11use forge_host::{Engine, GenerationOutput, Limits, Plugin, StageError};
12use forge_ir::{Diagnostic, Ir, Severity};
13
14#[derive(Debug, Clone)]
15pub struct PipelineConfig {
16    /// JSON config strings per stage. Indexed parallel to `transformers`,
17    /// followed by the generator config at the end. Stages with no config
18    /// receive the empty object `"{}"`.
19    pub configs: Vec<String>,
20    pub policy: StagePolicy,
21    /// Sandbox limits applied to every transformer stage.
22    pub transformer_limits: Limits,
23    /// Sandbox limits applied to the generator stage.
24    pub generator_limits: Limits,
25}
26
27impl Default for PipelineConfig {
28    fn default() -> Self {
29        Self {
30            configs: Vec::new(),
31            policy: StagePolicy::default(),
32            transformer_limits: Limits::transformer(),
33            generator_limits: Limits::generator(),
34        }
35    }
36}
37
38/// Halt-on-error policy. Default halts the pipeline before the next stage
39/// if the preceding stage produced any `error`-severity diagnostics.
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
41pub enum StagePolicy {
42    #[default]
43    HaltOnError,
44    AllowErrors,
45}
46
47#[derive(Debug)]
48pub struct PipelineOutput {
49    pub generation: GenerationOutput,
50    /// Aggregated diagnostics from every stage, in execution order.
51    pub diagnostics: Vec<Diagnostic>,
52}
53
54#[derive(Debug, thiserror::Error)]
55pub enum PipelineError {
56    #[error("transformer `{plugin}` failed: {source}")]
57    Transformer {
58        plugin: String,
59        #[source]
60        source: StageError,
61    },
62    #[error("generator `{plugin}` failed: {source}")]
63    Generator {
64        plugin: String,
65        #[source]
66        source: StageError,
67    },
68    #[error("stage `{plugin}` produced {count} error-severity diagnostics; halting")]
69    StageErrors {
70        plugin: String,
71        count: usize,
72        /// The halting stage's diagnostics, carried out so callers can
73        /// render them (the CLI prints each one) instead of being told
74        /// only how many there were.
75        diagnostics: Vec<Diagnostic>,
76    },
77}
78
79/// Run the configured pipeline.
80///
81/// `transformers` are applied in order. The result is then fed to
82/// `generator`. Diagnostics are aggregated and surfaced via
83/// [`PipelineOutput::diagnostics`]; the generator's files come back via
84/// [`PipelineOutput::generation`].
85pub fn run(
86    _engine: &Engine,
87    spec: Ir,
88    transformers: &[&Plugin],
89    generator: &Plugin,
90    cfg: &PipelineConfig,
91) -> Result<PipelineOutput, PipelineError> {
92    let mut diagnostics: Vec<Diagnostic> = Vec::new();
93    let mut current = spec;
94
95    let empty = "{}".to_string();
96    let cfg_or_empty = |i: usize| cfg.configs.get(i).unwrap_or(&empty).clone();
97
98    for (i, t) in transformers.iter().enumerate() {
99        let stage_cfg = cfg_or_empty(i);
100        let out = t
101            .transform(current, &stage_cfg, cfg.transformer_limits)
102            .map_err(|e| PipelineError::Transformer {
103                plugin: t.info().name.clone(),
104                source: e,
105            })?;
106        let err_count = out
107            .diagnostics
108            .iter()
109            .filter(|d| d.severity == Severity::Error)
110            .count();
111        if err_count > 0 && cfg.policy == StagePolicy::HaltOnError {
112            return Err(PipelineError::StageErrors {
113                plugin: t.info().name.clone(),
114                count: err_count,
115                diagnostics: out.diagnostics,
116            });
117        }
118        diagnostics.extend(out.diagnostics);
119        current = out.spec;
120    }
121
122    let gen_cfg = cfg_or_empty(transformers.len());
123    let gen_out = generator
124        .generate(current, &gen_cfg, cfg.generator_limits)
125        .map_err(|e| PipelineError::Generator {
126            plugin: generator.info().name.clone(),
127            source: e,
128        })?;
129    let err_count = gen_out
130        .diagnostics
131        .iter()
132        .filter(|d| d.severity == Severity::Error)
133        .count();
134    if err_count > 0 && cfg.policy == StagePolicy::HaltOnError {
135        return Err(PipelineError::StageErrors {
136            plugin: generator.info().name.clone(),
137            count: err_count,
138            diagnostics: gen_out.diagnostics,
139        });
140    }
141    diagnostics.extend(gen_out.diagnostics.iter().cloned());
142
143    Ok(PipelineOutput {
144        generation: gen_out,
145        diagnostics,
146    })
147}