1use std::collections::HashMap;
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use serde_yaml_bw::Value as YamlValue;
7
8use super::registry::DescribeRegistry;
9use super::schema::{CompiledSchema, compile_schema, validate_yaml_against_compiled_schema};
10use crate::path_safety::normalize_under_root;
11
12#[derive(Clone, Debug, Default)]
13pub struct ComponentSchema {
14 pub node_schema: Option<String>,
15}
16
17pub trait ComponentDescriber {
18 fn describe(&self, component: &str) -> Result<ComponentSchema, String>;
19}
20
21#[derive(Debug, Clone)]
22pub struct StaticComponentDescriber {
23 schemas: HashMap<String, ComponentSchema>,
24 fallback: ComponentSchema,
25}
26
27impl StaticComponentDescriber {
28 pub fn new() -> Self {
29 Self {
30 schemas: HashMap::new(),
31 fallback: ComponentSchema::default(),
32 }
33 }
34
35 pub fn with_fallback(mut self, fallback_schema: ComponentSchema) -> Self {
36 self.fallback = fallback_schema;
37 self
38 }
39
40 pub fn register_schema<S: Into<String>>(
41 &mut self,
42 component: S,
43 schema: ComponentSchema,
44 ) -> &mut Self {
45 self.schemas.insert(component.into(), schema);
46 self
47 }
48}
49
50impl ComponentDescriber for StaticComponentDescriber {
51 fn describe(&self, component: &str) -> Result<ComponentSchema, String> {
52 if let Some(schema) = self.schemas.get(component) {
53 Ok(schema.clone())
54 } else {
55 Ok(self.fallback.clone())
56 }
57 }
58}
59
60impl Default for StaticComponentDescriber {
61 fn default() -> Self {
62 Self::new()
63 }
64}
65
66pub struct FlowValidator<D> {
67 describer: D,
68 registry: DescribeRegistry,
69}
70
71#[derive(Clone)]
72struct ComponentValidationPlan {
73 schema_json: Option<String>,
74 schema_id: Option<String>,
75 defaults: Option<YamlValue>,
76 compiled_schema: Option<Arc<CompiledSchema>>,
77}
78
79#[derive(Clone, Debug)]
80pub struct ValidatedNode {
81 pub component: String,
82 pub node_config: YamlValue,
83 pub schema_json: Option<String>,
84 pub schema_id: Option<String>,
85 pub defaults: Option<YamlValue>,
86}
87
88impl<D> FlowValidator<D>
89where
90 D: ComponentDescriber,
91{
92 pub fn new(describer: D, registry: DescribeRegistry) -> Self {
93 Self {
94 describer,
95 registry,
96 }
97 }
98
99 pub fn validate_file<P>(&self, path: P) -> Result<Vec<ValidatedNode>, FlowValidationError>
100 where
101 P: AsRef<Path>,
102 {
103 let path_ref = path.as_ref();
104 let root = std::env::current_dir()
105 .map_err(|error| FlowValidationError::Io {
106 path: path_ref.to_path_buf(),
107 error,
108 })?
109 .canonicalize()
110 .map_err(|error| FlowValidationError::Io {
111 path: path_ref.to_path_buf(),
112 error,
113 })?;
114 let safe =
115 normalize_under_root(&root, path_ref).map_err(|error| FlowValidationError::Io {
116 path: path_ref.to_path_buf(),
117 error: std::io::Error::other(error.to_string()),
118 })?;
119 let source = fs::read_to_string(&safe)
120 .map_err(|error| FlowValidationError::Io { path: safe, error })?;
121 self.validate_str(&source)
122 }
123
124 pub fn validate_str(
125 &self,
126 yaml_source: &str,
127 ) -> Result<Vec<ValidatedNode>, FlowValidationError> {
128 let document: YamlValue = serde_yaml_bw::from_str(yaml_source).map_err(|error| {
129 FlowValidationError::YamlParse {
130 error: error.to_string(),
131 }
132 })?;
133 self.validate_document(&document)
134 }
135
136 pub fn validate_document(
137 &self,
138 document: &YamlValue,
139 ) -> Result<Vec<ValidatedNode>, FlowValidationError> {
140 let nodes = match nodes_from_document(document) {
141 Some(nodes) => nodes,
142 None => {
143 return Err(FlowValidationError::MissingNodes);
144 }
145 };
146
147 let mut validated_nodes = Vec::with_capacity(nodes.len());
148 let mut schema_cache: HashMap<String, Arc<CompiledSchema>> = HashMap::new();
149 let mut component_plan_cache: HashMap<String, ComponentValidationPlan> = HashMap::new();
150
151 for (index, node) in nodes.iter().enumerate() {
152 let node_mapping = match node.as_mapping() {
153 Some(mapping) => mapping,
154 None => {
155 return Err(FlowValidationError::NodeNotMapping { index });
156 }
157 };
158
159 let component = component_name(node_mapping)
160 .ok_or(FlowValidationError::MissingComponent { index })?;
161
162 if !component_plan_cache.contains_key(component) {
163 let schema = self.describer.describe(component).map_err(|error| {
164 FlowValidationError::DescribeFailed {
165 component: component.to_owned(),
166 error,
167 }
168 })?;
169 let schema_json = self
170 .registry
171 .get_schema(component)
172 .map(|schema| schema.to_owned())
173 .or_else(|| schema.node_schema.clone());
174 let compiled_schema = if let Some(schema_json) = schema_json.as_deref() {
175 if let Some(compiled) = schema_cache.get(schema_json) {
176 Some(Arc::clone(compiled))
177 } else {
178 let compiled = compile_schema(schema_json).map_err(|message| {
179 FlowValidationError::SchemaValidation {
180 component: component.to_owned(),
181 index,
182 message,
183 }
184 })?;
185 let compiled = Arc::new(compiled);
186 schema_cache.insert(schema_json.to_owned(), Arc::clone(&compiled));
187 Some(compiled)
188 }
189 } else {
190 None
191 };
192 let schema_id = compiled_schema
193 .as_ref()
194 .and_then(|compiled| compiled.schema_id.clone());
195 let defaults = self.registry.get_defaults(component).cloned();
196 component_plan_cache.insert(
197 component.to_owned(),
198 ComponentValidationPlan {
199 schema_json,
200 schema_id,
201 defaults,
202 compiled_schema,
203 },
204 );
205 }
206
207 let plan = component_plan_cache
208 .get(component)
209 .expect("component plan cache must contain computed entry");
210 if let Some(compiled) = plan.compiled_schema.as_deref() {
211 validate_yaml_against_compiled_schema(node, &compiled.validator).map_err(
212 |message| FlowValidationError::SchemaValidation {
213 component: component.to_owned(),
214 index,
215 message,
216 },
217 )?;
218 }
219
220 validated_nodes.push(ValidatedNode {
221 component: component.to_owned(),
222 node_config: node.clone(),
223 schema_json: plan.schema_json.clone(),
224 schema_id: plan.schema_id.clone(),
225 defaults: plan.defaults.clone(),
226 });
227 }
228
229 Ok(validated_nodes)
230 }
231}
232
233fn nodes_from_document(document: &YamlValue) -> Option<&Vec<YamlValue>> {
234 if let Some(sequence) = document.as_sequence() {
235 return Some(&**sequence);
236 }
237
238 let mapping = document.as_mapping()?;
239 mapping
240 .get("nodes")
241 .and_then(|value| value.as_sequence().map(|sequence| &**sequence))
242}
243
244fn component_name(mapping: &serde_yaml_bw::Mapping) -> Option<&str> {
245 mapping
246 .get("component")
247 .and_then(|value| value.as_str())
248 .or_else(|| mapping.get("type").and_then(|value| value.as_str()))
249}
250
251#[derive(Debug)]
252pub enum FlowValidationError {
253 Io {
254 path: PathBuf,
255 error: std::io::Error,
256 },
257 YamlParse {
258 error: String,
259 },
260 MissingNodes,
261 NodeNotMapping {
262 index: usize,
263 },
264 MissingComponent {
265 index: usize,
266 },
267 DescribeFailed {
268 component: String,
269 error: String,
270 },
271 SchemaValidation {
272 component: String,
273 index: usize,
274 message: String,
275 },
276}