Skip to main content

greentic_dev/dev_runner/
runner.rs

1use std::collections::HashMap;
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use serde_yaml_bw::Value as YamlValue;
7
8use super::registry::DescribeRegistry;
9use super::schema::{CompiledSchema, compile_schema, validate_yaml_against_compiled_schema};
10use crate::path_safety::normalize_under_root;
11
12#[derive(Clone, Debug, Default)]
13pub struct ComponentSchema {
14    pub node_schema: Option<String>,
15}
16
17pub trait ComponentDescriber {
18    fn describe(&self, component: &str) -> Result<ComponentSchema, String>;
19}
20
21#[derive(Debug, Clone)]
22pub struct StaticComponentDescriber {
23    schemas: HashMap<String, ComponentSchema>,
24    fallback: ComponentSchema,
25}
26
27impl StaticComponentDescriber {
28    pub fn new() -> Self {
29        Self {
30            schemas: HashMap::new(),
31            fallback: ComponentSchema::default(),
32        }
33    }
34
35    pub fn with_fallback(mut self, fallback_schema: ComponentSchema) -> Self {
36        self.fallback = fallback_schema;
37        self
38    }
39
40    pub fn register_schema<S: Into<String>>(
41        &mut self,
42        component: S,
43        schema: ComponentSchema,
44    ) -> &mut Self {
45        self.schemas.insert(component.into(), schema);
46        self
47    }
48}
49
50impl ComponentDescriber for StaticComponentDescriber {
51    fn describe(&self, component: &str) -> Result<ComponentSchema, String> {
52        if let Some(schema) = self.schemas.get(component) {
53            Ok(schema.clone())
54        } else {
55            Ok(self.fallback.clone())
56        }
57    }
58}
59
60impl Default for StaticComponentDescriber {
61    fn default() -> Self {
62        Self::new()
63    }
64}
65
66pub struct FlowValidator<D> {
67    describer: D,
68    registry: DescribeRegistry,
69}
70
71#[derive(Clone)]
72struct ComponentValidationPlan {
73    schema_json: Option<String>,
74    schema_id: Option<String>,
75    defaults: Option<YamlValue>,
76    compiled_schema: Option<Arc<CompiledSchema>>,
77}
78
79#[derive(Clone, Debug)]
80pub struct ValidatedNode {
81    pub component: String,
82    pub node_config: YamlValue,
83    pub schema_json: Option<String>,
84    pub schema_id: Option<String>,
85    pub defaults: Option<YamlValue>,
86}
87
88impl<D> FlowValidator<D>
89where
90    D: ComponentDescriber,
91{
92    pub fn new(describer: D, registry: DescribeRegistry) -> Self {
93        Self {
94            describer,
95            registry,
96        }
97    }
98
99    pub fn validate_file<P>(&self, path: P) -> Result<Vec<ValidatedNode>, FlowValidationError>
100    where
101        P: AsRef<Path>,
102    {
103        let path_ref = path.as_ref();
104        let root = std::env::current_dir()
105            .map_err(|error| FlowValidationError::Io {
106                path: path_ref.to_path_buf(),
107                error,
108            })?
109            .canonicalize()
110            .map_err(|error| FlowValidationError::Io {
111                path: path_ref.to_path_buf(),
112                error,
113            })?;
114        let safe =
115            normalize_under_root(&root, path_ref).map_err(|error| FlowValidationError::Io {
116                path: path_ref.to_path_buf(),
117                error: std::io::Error::other(error.to_string()),
118            })?;
119        let source = fs::read_to_string(&safe)
120            .map_err(|error| FlowValidationError::Io { path: safe, error })?;
121        self.validate_str(&source)
122    }
123
124    pub fn validate_str(
125        &self,
126        yaml_source: &str,
127    ) -> Result<Vec<ValidatedNode>, FlowValidationError> {
128        let document: YamlValue = serde_yaml_bw::from_str(yaml_source).map_err(|error| {
129            FlowValidationError::YamlParse {
130                error: error.to_string(),
131            }
132        })?;
133        self.validate_document(&document)
134    }
135
136    pub fn validate_document(
137        &self,
138        document: &YamlValue,
139    ) -> Result<Vec<ValidatedNode>, FlowValidationError> {
140        let nodes = match nodes_from_document(document) {
141            Some(nodes) => nodes,
142            None => {
143                return Err(FlowValidationError::MissingNodes);
144            }
145        };
146
147        let mut validated_nodes = Vec::with_capacity(nodes.len());
148        let mut schema_cache: HashMap<String, Arc<CompiledSchema>> = HashMap::new();
149        let mut component_plan_cache: HashMap<String, ComponentValidationPlan> = HashMap::new();
150
151        for (index, node) in nodes.iter().enumerate() {
152            let node_mapping = match node.as_mapping() {
153                Some(mapping) => mapping,
154                None => {
155                    return Err(FlowValidationError::NodeNotMapping { index });
156                }
157            };
158
159            let component = component_name(node_mapping)
160                .ok_or(FlowValidationError::MissingComponent { index })?;
161
162            if !component_plan_cache.contains_key(component) {
163                let schema = self.describer.describe(component).map_err(|error| {
164                    FlowValidationError::DescribeFailed {
165                        component: component.to_owned(),
166                        error,
167                    }
168                })?;
169                let schema_json = self
170                    .registry
171                    .get_schema(component)
172                    .map(|schema| schema.to_owned())
173                    .or_else(|| schema.node_schema.clone());
174                let compiled_schema = if let Some(schema_json) = schema_json.as_deref() {
175                    if let Some(compiled) = schema_cache.get(schema_json) {
176                        Some(Arc::clone(compiled))
177                    } else {
178                        let compiled = compile_schema(schema_json).map_err(|message| {
179                            FlowValidationError::SchemaValidation {
180                                component: component.to_owned(),
181                                index,
182                                message,
183                            }
184                        })?;
185                        let compiled = Arc::new(compiled);
186                        schema_cache.insert(schema_json.to_owned(), Arc::clone(&compiled));
187                        Some(compiled)
188                    }
189                } else {
190                    None
191                };
192                let schema_id = compiled_schema
193                    .as_ref()
194                    .and_then(|compiled| compiled.schema_id.clone());
195                let defaults = self.registry.get_defaults(component).cloned();
196                component_plan_cache.insert(
197                    component.to_owned(),
198                    ComponentValidationPlan {
199                        schema_json,
200                        schema_id,
201                        defaults,
202                        compiled_schema,
203                    },
204                );
205            }
206
207            let plan = component_plan_cache
208                .get(component)
209                .expect("component plan cache must contain computed entry");
210            if let Some(compiled) = plan.compiled_schema.as_deref() {
211                validate_yaml_against_compiled_schema(node, &compiled.validator).map_err(
212                    |message| FlowValidationError::SchemaValidation {
213                        component: component.to_owned(),
214                        index,
215                        message,
216                    },
217                )?;
218            }
219
220            validated_nodes.push(ValidatedNode {
221                component: component.to_owned(),
222                node_config: node.clone(),
223                schema_json: plan.schema_json.clone(),
224                schema_id: plan.schema_id.clone(),
225                defaults: plan.defaults.clone(),
226            });
227        }
228
229        Ok(validated_nodes)
230    }
231}
232
233fn nodes_from_document(document: &YamlValue) -> Option<&Vec<YamlValue>> {
234    if let Some(sequence) = document.as_sequence() {
235        return Some(&**sequence);
236    }
237
238    let mapping = document.as_mapping()?;
239    mapping
240        .get("nodes")
241        .and_then(|value| value.as_sequence().map(|sequence| &**sequence))
242}
243
244fn component_name(mapping: &serde_yaml_bw::Mapping) -> Option<&str> {
245    mapping
246        .get("component")
247        .and_then(|value| value.as_str())
248        .or_else(|| mapping.get("type").and_then(|value| value.as_str()))
249}
250
251#[derive(Debug)]
252pub enum FlowValidationError {
253    Io {
254        path: PathBuf,
255        error: std::io::Error,
256    },
257    YamlParse {
258        error: String,
259    },
260    MissingNodes,
261    NodeNotMapping {
262        index: usize,
263    },
264    MissingComponent {
265        index: usize,
266    },
267    DescribeFailed {
268        component: String,
269        error: String,
270    },
271    SchemaValidation {
272        component: String,
273        index: usize,
274        message: String,
275    },
276}