greentic_conformance/
flow_suite.rs1use std::{
2 collections::HashSet,
3 fs,
4 path::{Path, PathBuf},
5 sync::Arc,
6};
7
8use anyhow::{bail, Context, Result};
9use serde::{Deserialize, Serialize};
10use walkdir::WalkDir;
11
12pub struct FlowValidationOptions {
14 pub allowed_extensions: Vec<String>,
15 pub require_schema: bool,
16 validators: Vec<Arc<dyn FlowValidator>>,
17}
18
19impl Clone for FlowValidationOptions {
20 fn clone(&self) -> Self {
21 Self {
22 allowed_extensions: self.allowed_extensions.clone(),
23 require_schema: self.require_schema,
24 validators: self.validators.clone(),
25 }
26 }
27}
28
29impl std::fmt::Debug for FlowValidationOptions {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 f.debug_struct("FlowValidationOptions")
32 .field("allowed_extensions", &self.allowed_extensions)
33 .field("require_schema", &self.require_schema)
34 .field("validators", &self.validators.len())
35 .finish()
36 }
37}
38
39impl Default for FlowValidationOptions {
40 fn default() -> Self {
41 Self {
42 allowed_extensions: vec!["ygtc".into(), "yaml".into(), "yml".into(), "json".into()],
43 require_schema: true,
44 validators: Vec::new(),
45 }
46 }
47}
48
49pub trait FlowValidator: Send + Sync {
51 fn validate(&self, flow: &FlowDocument) -> Result<()>;
52}
53
54impl<T> FlowValidator for T
55where
56 T: Fn(&FlowDocument) -> Result<()> + Send + Sync,
57{
58 fn validate(&self, flow: &FlowDocument) -> Result<()> {
59 (self)(flow)
60 }
61}
62
63#[derive(Debug, Clone, Deserialize, Serialize)]
65pub struct FlowDocument {
66 pub id: String,
67 #[serde(default)]
68 pub name: Option<String>,
69 #[serde(default)]
70 pub summary: Option<String>,
71 #[serde(default)]
72 pub schema: Option<serde_json::Value>,
73 pub nodes: Vec<FlowNode>,
74}
75
76#[derive(Debug, Clone, Deserialize, Serialize)]
78pub struct FlowNode {
79 pub id: String,
80 #[serde(alias = "type")]
81 pub kind: String,
82 #[serde(default)]
83 pub description: Option<String>,
84 #[serde(default)]
85 pub metadata: Option<serde_json::Value>,
86}
87
88#[derive(Debug, Clone)]
90pub struct FlowValidationReport {
91 pub root: PathBuf,
92 pub flows: Vec<FlowDocument>,
93}
94
95pub fn validate_flow_folder(path: &str) -> Result<FlowValidationReport> {
97 FlowValidationOptions::default().validate_flow_folder(path)
98}
99
100impl FlowValidationOptions {
101 pub fn with_allowed_extensions<I, S>(mut self, extensions: I) -> Self
103 where
104 I: IntoIterator<Item = S>,
105 S: Into<String>,
106 {
107 self.allowed_extensions = extensions
108 .into_iter()
109 .map(|ext| ext.into().to_ascii_lowercase())
110 .collect();
111 self
112 }
113
114 pub fn allow_extension(mut self, extension: impl Into<String>) -> Self {
116 let extension = extension.into().to_ascii_lowercase();
117 if !self.allowed_extensions.iter().any(|ext| ext == &extension) {
118 self.allowed_extensions.push(extension);
119 }
120 self
121 }
122
123 pub fn require_schema(mut self, required: bool) -> Self {
125 self.require_schema = required;
126 self
127 }
128
129 pub fn allow_missing_schema(self) -> Self {
131 self.require_schema(false)
132 }
133
134 pub fn add_validator<V>(mut self, validator: V) -> Self
136 where
137 V: FlowValidator + 'static,
138 {
139 self.validators.push(Arc::new(validator));
140 self
141 }
142
143 pub fn with_validators<I, V>(mut self, validators: I) -> Self
145 where
146 I: IntoIterator<Item = V>,
147 V: FlowValidator + 'static,
148 {
149 self.validators = validators
150 .into_iter()
151 .map(|validator| Arc::new(validator) as Arc<dyn FlowValidator>)
152 .collect();
153 self
154 }
155
156 pub fn validate_flow_folder(&self, path: impl AsRef<Path>) -> Result<FlowValidationReport> {
158 let path = path.as_ref();
159 if !path.exists() {
160 bail!("flow path '{}' does not exist", path.display());
161 }
162
163 let mut flows = Vec::new();
164 let mut any_files = false;
165
166 for entry in WalkDir::new(path).into_iter().flatten() {
167 if !entry.file_type().is_file() {
168 continue;
169 }
170
171 let ext = entry
172 .path()
173 .extension()
174 .and_then(|s| s.to_str())
175 .map(|s| s.to_ascii_lowercase())
176 .unwrap_or_default();
177
178 if !self
179 .allowed_extensions
180 .iter()
181 .any(|allowed| allowed == &ext)
182 {
183 continue;
184 }
185
186 any_files = true;
187
188 let raw = fs::read_to_string(entry.path())
189 .with_context(|| format!("failed to read flow file {}", entry.path().display()))?;
190 let document = parse_flow_document(entry.path(), &raw)?;
191 validate_flow(&document, self.require_schema)
192 .with_context(|| format!("invalid flow document {}", entry.path().display()))?;
193 for validator in &self.validators {
194 validator.validate(&document).with_context(|| {
195 format!("custom validator failed for {}", entry.path().display())
196 })?;
197 }
198 flows.push(document);
199 }
200
201 if !any_files {
202 bail!(
203 "no flow definitions found under '{}' (expected extensions: {:?})",
204 path.display(),
205 self.allowed_extensions
206 );
207 }
208
209 Ok(FlowValidationReport {
210 root: path.to_path_buf(),
211 flows,
212 })
213 }
214}
215
216fn parse_flow_document(path: &Path, raw: &str) -> Result<FlowDocument> {
217 if path
218 .extension()
219 .is_some_and(|ext| ext == "json" || ext == "JSON")
220 {
221 serde_json::from_str(raw).with_context(|| {
222 format!(
223 "failed to parse flow JSON {}",
224 path.file_name()
225 .and_then(|p| p.to_str())
226 .unwrap_or("<unknown>")
227 )
228 })
229 } else {
230 serde_yaml::from_str(raw).or_else(|yaml_err| {
231 serde_json::from_str(raw).map_err(|json_err| {
232 anyhow::anyhow!(
233 "failed to parse flow file {} as YAML ({yaml_err}) or JSON ({json_err})",
234 path.display()
235 )
236 })
237 })
238 }
239}
240
241fn validate_flow(flow: &FlowDocument, require_schema: bool) -> Result<()> {
242 if flow.id.trim().is_empty() {
243 bail!("flow id must not be empty");
244 }
245 if require_schema && flow.schema.is_none() {
246 bail!("flow '{}' must declare a schema", flow.id);
247 }
248 if flow.nodes.is_empty() {
249 bail!("flow '{}' must declare at least one node", flow.id);
250 }
251 let mut seen_ids = HashSet::new();
252 for node in &flow.nodes {
253 if node.id.trim().is_empty() {
254 bail!("flow '{}' contains a node with an empty id", flow.id);
255 }
256 if node.kind.trim().is_empty() {
257 bail!(
258 "flow '{}' node '{}' must declare a type/kind",
259 flow.id,
260 node.id
261 );
262 }
263 if !seen_ids.insert(node.id.clone()) {
264 bail!(
265 "flow '{}' contains duplicate node id '{}'",
266 flow.id,
267 node.id
268 );
269 }
270 if let Some(metadata) = &node.metadata {
271 if !metadata.is_object() {
272 bail!(
273 "flow '{}' node '{}' metadata must be a JSON object",
274 flow.id,
275 node.id
276 );
277 }
278 }
279 }
280
281 Ok(())
282}