code0_flow/flow_definition/
mod.rs

1mod error;
2mod feature;
3
4use crate::flow_definition::error::ReaderError;
5use crate::flow_definition::feature::Feature;
6use crate::flow_definition::feature::version::HasVersion;
7use serde::de::DeserializeOwned;
8use std::fs;
9use std::path::Path;
10use tucana::shared::{DefinitionDataType, FlowType, RuntimeFunctionDefinition, Version};
11use walkdir::WalkDir;
12
13pub struct Reader {
14    should_break: bool,
15    accepted_features: Vec<String>,
16    accepted_version: Option<Version>,
17    path: String,
18}
19
20impl Reader {
21    pub fn configure(
22        path: String,
23        should_break: bool,
24        accepted_features: Vec<String>,
25        accepted_version: Option<Version>,
26    ) -> Self {
27        Self {
28            should_break,
29            accepted_features,
30            accepted_version,
31            path,
32        }
33    }
34
35    pub fn read_features(&self) -> Result<Vec<Feature>, ReaderError> {
36        let definitions = Path::new(&self.path);
37
38        match self.read_feature_content(definitions) {
39            Ok(features) => {
40                log::info!(
41                    "Loaded {:?} feature/s",
42                    &features
43                        .iter()
44                        .map(|f| f.name.clone())
45                        .collect::<Vec<String>>()
46                );
47
48                log::debug!(
49                    "Found FlowTypes {:?}",
50                    &features
51                        .iter()
52                        .flat_map(|f| f.flow_types.iter().map(|t| t.identifier.clone()))
53                        .collect::<Vec<String>>()
54                );
55
56                log::debug!(
57                    "Found DataTypes {:?}",
58                    &features
59                        .iter()
60                        .flat_map(|f| f.data_types.iter().map(|t| t.identifier.clone()))
61                        .collect::<Vec<String>>()
62                );
63
64                log::debug!(
65                    "Found Functions {:?}",
66                    &features
67                        .iter()
68                        .flat_map(|f| f.functions.iter().map(|t| t.runtime_name.clone()))
69                        .collect::<Vec<String>>()
70                );
71
72                Ok(features)
73            }
74            Err(err) => {
75                log::error!("Failed to read feature/s from {}, {:?}", &self.path, err);
76                Err(ReaderError::ReadFeatureError {
77                    path: self.path.to_string(),
78                    source: Box::new(err),
79                })
80            }
81        }
82    }
83
84    fn read_feature_content(&self, dir: &Path) -> Result<Vec<Feature>, ReaderError> {
85        let mut features: Vec<Feature> = Vec::new();
86
87        let readdir = fs::read_dir(dir).map_err(|err| {
88            log::error!("Failed to read directory {}: {:?}", dir.display(), err);
89            ReaderError::ReadDirectoryError {
90                path: dir.to_path_buf(),
91                error: err,
92            }
93        })?;
94
95        for entry_result in readdir {
96            let entry = match entry_result {
97                Ok(entry) => entry,
98                Err(err) => {
99                    log::error!("Failed to read directory entry: {:?}", err);
100                    return Err(ReaderError::DirectoryEntryError(err));
101                }
102            };
103
104            let path = entry.path();
105
106            if !path.is_dir() {
107                continue;
108            }
109
110            let feature_name = path
111                .file_name()
112                .unwrap_or_default()
113                .to_string_lossy()
114                .to_string();
115
116            if !self.accepted_features.is_empty() && !self.accepted_features.contains(&feature_name)
117            {
118                log::info!("Skipping not accepted feature: {}", feature_name);
119                continue;
120            }
121
122            let data_types = match self
123                .load_definitions_for_feature::<DefinitionDataType>(&path, "data_type")?
124            {
125                Some(v) => v,
126                None => continue,
127            };
128
129            let flow_types =
130                match self.load_definitions_for_feature::<FlowType>(&path, "flow_type")? {
131                    Some(v) => v,
132                    None => continue,
133                };
134
135            let functions = match self.load_definitions_for_feature::<RuntimeFunctionDefinition>(
136                &path,
137                "runtime_definition",
138            )? {
139                Some(v) => v,
140                None => continue,
141            };
142
143            let feature = Feature {
144                name: feature_name,
145                data_types,
146                flow_types,
147                functions,
148            };
149
150            features.push(feature);
151        }
152
153        Ok(features)
154    }
155
156    fn load_definitions_for_feature<T>(
157        &self,
158        feature_dir: &Path,
159        sub_dir: &str,
160    ) -> Result<Option<Vec<T>>, ReaderError>
161    where
162        T: DeserializeOwned + HasVersion,
163    {
164        let dir = feature_dir.join(sub_dir);
165
166        let raw: Vec<T> = match self.collect_definitions::<T>(&dir) {
167            Ok(v) => v,
168            Err(err) => {
169                if self.should_break {
170                    return Err(ReaderError::ReadFeatureError {
171                        path: dir.to_string_lossy().to_string(),
172                        source: Box::new(err),
173                    });
174                } else {
175                    // Skip this feature if we shouldn't break on error
176                    return Ok(None);
177                }
178            }
179        };
180
181        let items = raw
182            .into_iter()
183            .map(|mut v| {
184                v.normalize_version();
185                v
186            })
187            .filter(|v| v.is_accepted(&self.accepted_version))
188            .collect();
189
190        Ok(Some(items))
191    }
192
193    fn collect_definitions<T>(&self, dir: &Path) -> Result<Vec<T>, ReaderError>
194    where
195        T: DeserializeOwned,
196    {
197        let mut definitions = Vec::new();
198
199        if !dir.exists() {
200            return Ok(definitions);
201        }
202
203        for entry in WalkDir::new(dir).into_iter().filter_map(Result::ok) {
204            let path = entry.path();
205
206            if path.is_file() && path.extension().is_some_and(|ext| ext == "json") {
207                let content = match fs::read_to_string(path) {
208                    Ok(content) => content,
209                    Err(err) => {
210                        log::error!("Failed to read file {}: {}", path.display(), err);
211                        return Err(ReaderError::ReadFileError {
212                            path: path.to_path_buf(),
213                            error: err,
214                        });
215                    }
216                };
217
218                match serde_json::from_str::<T>(&content) {
219                    Ok(def) => definitions.push(def),
220                    Err(e) => {
221                        if self.should_break {
222                            log::error!("Failed to parse JSON in file {}: {:?}", path.display(), e);
223                            return Err(ReaderError::JsonError {
224                                path: path.to_path_buf(),
225                                error: e,
226                            });
227                        } else {
228                            log::warn!("Skipping invalid JSON file {}: {:?}", path.display(), e);
229                        }
230                    }
231                }
232            }
233        }
234
235        Ok(definitions)
236    }
237}