greentic_conformance/
flow_suite.rs

1use std::{
2    collections::HashSet,
3    fs,
4    path::{Path, PathBuf},
5    sync::Arc,
6};
7
8use anyhow::{bail, Context, Result};
9use serde::{Deserialize, Serialize};
10use walkdir::WalkDir;
11
12/// Options that tweak flow validation behaviour.
13pub struct FlowValidationOptions {
14    pub allowed_extensions: Vec<String>,
15    pub require_schema: bool,
16    validators: Vec<Arc<dyn FlowValidator>>,
17}
18
19impl Clone for FlowValidationOptions {
20    fn clone(&self) -> Self {
21        Self {
22            allowed_extensions: self.allowed_extensions.clone(),
23            require_schema: self.require_schema,
24            validators: self.validators.clone(),
25        }
26    }
27}
28
29impl std::fmt::Debug for FlowValidationOptions {
30    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        f.debug_struct("FlowValidationOptions")
32            .field("allowed_extensions", &self.allowed_extensions)
33            .field("require_schema", &self.require_schema)
34            .field("validators", &self.validators.len())
35            .finish()
36    }
37}
38
39impl Default for FlowValidationOptions {
40    fn default() -> Self {
41        Self {
42            allowed_extensions: vec!["ygtc".into(), "yaml".into(), "yml".into(), "json".into()],
43            require_schema: true,
44            validators: Vec::new(),
45        }
46    }
47}
48
49/// Custom validator hook that can enforce additional flow invariants.
50pub trait FlowValidator: Send + Sync {
51    fn validate(&self, flow: &FlowDocument) -> Result<()>;
52}
53
54impl<T> FlowValidator for T
55where
56    T: Fn(&FlowDocument) -> Result<()> + Send + Sync,
57{
58    fn validate(&self, flow: &FlowDocument) -> Result<()> {
59        (self)(flow)
60    }
61}
62
63/// A validated flow document.
64#[derive(Debug, Clone, Deserialize, Serialize)]
65pub struct FlowDocument {
66    pub id: String,
67    #[serde(default)]
68    pub name: Option<String>,
69    #[serde(default)]
70    pub summary: Option<String>,
71    #[serde(default)]
72    pub schema: Option<serde_json::Value>,
73    pub nodes: Vec<FlowNode>,
74}
75
76/// A single node in a flow document.
77#[derive(Debug, Clone, Deserialize, Serialize)]
78pub struct FlowNode {
79    pub id: String,
80    #[serde(alias = "type")]
81    pub kind: String,
82    #[serde(default)]
83    pub description: Option<String>,
84    #[serde(default)]
85    pub metadata: Option<serde_json::Value>,
86}
87
88/// Report returned after validating a folder of flows.
89#[derive(Debug, Clone)]
90pub struct FlowValidationReport {
91    pub root: PathBuf,
92    pub flows: Vec<FlowDocument>,
93}
94
95/// Validates all flow documents inside the provided path using default options.
96pub fn validate_flow_folder(path: &str) -> Result<FlowValidationReport> {
97    FlowValidationOptions::default().validate_flow_folder(path)
98}
99
100impl FlowValidationOptions {
101    /// Replaces the list of allowed extensions.
102    pub fn with_allowed_extensions<I, S>(mut self, extensions: I) -> Self
103    where
104        I: IntoIterator<Item = S>,
105        S: Into<String>,
106    {
107        self.allowed_extensions = extensions
108            .into_iter()
109            .map(|ext| ext.into().to_ascii_lowercase())
110            .collect();
111        self
112    }
113
114    /// Adds one extra extension (case-insensitive) to the allow list.
115    pub fn allow_extension(mut self, extension: impl Into<String>) -> Self {
116        let extension = extension.into().to_ascii_lowercase();
117        if !self.allowed_extensions.iter().any(|ext| ext == &extension) {
118            self.allowed_extensions.push(extension);
119        }
120        self
121    }
122
123    /// Whether a schema definition must be present in every flow.
124    pub fn require_schema(mut self, required: bool) -> Self {
125        self.require_schema = required;
126        self
127    }
128
129    /// Allows flows to omit a schema definition.
130    pub fn allow_missing_schema(self) -> Self {
131        self.require_schema(false)
132    }
133
134    /// Registers an additional validator that will run on each flow document.
135    pub fn add_validator<V>(mut self, validator: V) -> Self
136    where
137        V: FlowValidator + 'static,
138    {
139        self.validators.push(Arc::new(validator));
140        self
141    }
142
143    /// Replaces any registered validators with the provided set.
144    pub fn with_validators<I, V>(mut self, validators: I) -> Self
145    where
146        I: IntoIterator<Item = V>,
147        V: FlowValidator + 'static,
148    {
149        self.validators = validators
150            .into_iter()
151            .map(|validator| Arc::new(validator) as Arc<dyn FlowValidator>)
152            .collect();
153        self
154    }
155
156    /// Validates all flow documents inside the provided path.
157    pub fn validate_flow_folder(&self, path: impl AsRef<Path>) -> Result<FlowValidationReport> {
158        let path = path.as_ref();
159        if !path.exists() {
160            bail!("flow path '{}' does not exist", path.display());
161        }
162
163        let mut flows = Vec::new();
164        let mut any_files = false;
165
166        for entry in WalkDir::new(path).into_iter().flatten() {
167            if !entry.file_type().is_file() {
168                continue;
169            }
170
171            let ext = entry
172                .path()
173                .extension()
174                .and_then(|s| s.to_str())
175                .map(|s| s.to_ascii_lowercase())
176                .unwrap_or_default();
177
178            if !self
179                .allowed_extensions
180                .iter()
181                .any(|allowed| allowed == &ext)
182            {
183                continue;
184            }
185
186            any_files = true;
187
188            let raw = fs::read_to_string(entry.path())
189                .with_context(|| format!("failed to read flow file {}", entry.path().display()))?;
190            let document = parse_flow_document(entry.path(), &raw)?;
191            validate_flow(&document, self.require_schema)
192                .with_context(|| format!("invalid flow document {}", entry.path().display()))?;
193            for validator in &self.validators {
194                validator.validate(&document).with_context(|| {
195                    format!("custom validator failed for {}", entry.path().display())
196                })?;
197            }
198            flows.push(document);
199        }
200
201        if !any_files {
202            bail!(
203                "no flow definitions found under '{}' (expected extensions: {:?})",
204                path.display(),
205                self.allowed_extensions
206            );
207        }
208
209        Ok(FlowValidationReport {
210            root: path.to_path_buf(),
211            flows,
212        })
213    }
214}
215
216fn parse_flow_document(path: &Path, raw: &str) -> Result<FlowDocument> {
217    if path
218        .extension()
219        .is_some_and(|ext| ext == "json" || ext == "JSON")
220    {
221        serde_json::from_str(raw).with_context(|| {
222            format!(
223                "failed to parse flow JSON {}",
224                path.file_name()
225                    .and_then(|p| p.to_str())
226                    .unwrap_or("<unknown>")
227            )
228        })
229    } else {
230        serde_yaml::from_str(raw).or_else(|yaml_err| {
231            serde_json::from_str(raw).map_err(|json_err| {
232                anyhow::anyhow!(
233                    "failed to parse flow file {} as YAML ({yaml_err}) or JSON ({json_err})",
234                    path.display()
235                )
236            })
237        })
238    }
239}
240
241fn validate_flow(flow: &FlowDocument, require_schema: bool) -> Result<()> {
242    if flow.id.trim().is_empty() {
243        bail!("flow id must not be empty");
244    }
245    if require_schema && flow.schema.is_none() {
246        bail!("flow '{}' must declare a schema", flow.id);
247    }
248    if flow.nodes.is_empty() {
249        bail!("flow '{}' must declare at least one node", flow.id);
250    }
251    let mut seen_ids = HashSet::new();
252    for node in &flow.nodes {
253        if node.id.trim().is_empty() {
254            bail!("flow '{}' contains a node with an empty id", flow.id);
255        }
256        if node.kind.trim().is_empty() {
257            bail!(
258                "flow '{}' node '{}' must declare a type/kind",
259                flow.id,
260                node.id
261            );
262        }
263        if !seen_ids.insert(node.id.clone()) {
264            bail!(
265                "flow '{}' contains duplicate node id '{}'",
266                flow.id,
267                node.id
268            );
269        }
270        if let Some(metadata) = &node.metadata {
271            if !metadata.is_object() {
272                bail!(
273                    "flow '{}' node '{}' metadata must be a JSON object",
274                    flow.id,
275                    node.id
276                );
277            }
278        }
279    }
280
281    Ok(())
282}