Skip to main content

code0_flow/flow_definition/
mod.rs

1mod error;
2mod feature;
3
4use crate::flow_definition::error::ReaderError;
5use crate::flow_definition::feature::Feature;
6use crate::flow_definition::feature::version::HasVersion;
7use serde::de::DeserializeOwned;
8use std::fs;
9use std::path::Path;
10use tucana::shared::{DefinitionDataType, FlowType, FunctionDefinition, RuntimeFunctionDefinition};
11use walkdir::WalkDir;
12
13pub struct Reader {
14    should_break: bool,
15    accepted_features: Vec<String>,
16    accepted_version: Option<String>,
17    path: String,
18}
19
20impl Reader {
21    pub fn configure(
22        path: String,
23        should_break: bool,
24        accepted_features: Vec<String>,
25        accepted_version: Option<String>,
26    ) -> Self {
27        Self {
28            should_break,
29            accepted_features,
30            accepted_version,
31            path,
32        }
33    }
34
35    pub fn read_features(&self) -> Result<Vec<Feature>, ReaderError> {
36        let definitions = Path::new(&self.path);
37
38        match self.read_feature_content(definitions) {
39            Ok(features) => {
40                log::info!(
41                    "Loaded {:?} feature/s",
42                    &features
43                        .iter()
44                        .map(|f| f.name.clone())
45                        .collect::<Vec<String>>()
46                );
47
48                log::debug!(
49                    "Found FlowTypes {:?}",
50                    &features
51                        .iter()
52                        .flat_map(|f| f.flow_types.iter().map(|t| t.identifier.clone()))
53                        .collect::<Vec<String>>()
54                );
55
56                log::debug!(
57                    "Found DataTypes {:?}",
58                    &features
59                        .iter()
60                        .flat_map(|f| f.data_types.iter().map(|t| t.identifier.clone()))
61                        .collect::<Vec<String>>()
62                );
63
64                log::debug!(
65                    "Found RuntimeFunctions {:?}",
66                    &features
67                        .iter()
68                        .flat_map(|f| f.runtime_functions.iter().map(|t| t.runtime_name.clone()))
69                        .collect::<Vec<String>>()
70                );
71
72                Ok(features)
73            }
74            Err(err) => {
75                log::error!("Failed to read feature/s from {}, {:?}", &self.path, err);
76                Err(ReaderError::ReadFeatureError {
77                    path: self.path.to_string(),
78                    source: Box::new(err),
79                })
80            }
81        }
82    }
83
84    fn read_feature_content(&self, dir: &Path) -> Result<Vec<Feature>, ReaderError> {
85        let mut features: Vec<Feature> = Vec::new();
86
87        let readdir = fs::read_dir(dir).map_err(|err| {
88            log::error!("Failed to read directory {}: {:?}", dir.display(), err);
89            ReaderError::ReadDirectoryError {
90                path: dir.to_path_buf(),
91                error: err,
92            }
93        })?;
94
95        for entry_result in readdir {
96            let entry = match entry_result {
97                Ok(entry) => entry,
98                Err(err) => {
99                    log::error!("Failed to read directory entry: {:?}", err);
100                    return Err(ReaderError::DirectoryEntryError(err));
101                }
102            };
103
104            let path = entry.path();
105
106            if !path.is_dir() {
107                continue;
108            }
109
110            let feature_name = path
111                .file_name()
112                .unwrap_or_default()
113                .to_string_lossy()
114                .to_string();
115
116            if !self.accepted_features.is_empty() && !self.accepted_features.contains(&feature_name)
117            {
118                log::info!("Skipping not accepted feature: {}", feature_name);
119                continue;
120            }
121
122            let data_types = match self
123                .load_definitions_for_feature::<DefinitionDataType>(&path, "data_types")?
124            {
125                Some(v) => v,
126                None => continue,
127            };
128
129            let flow_types =
130                match self.load_definitions_for_feature::<FlowType>(&path, "flow_types")? {
131                    Some(v) => v,
132                    None => continue,
133                };
134
135            let runtime_functions = match self
136                .load_definitions_for_feature::<RuntimeFunctionDefinition>(
137                    &path,
138                    "runtime_functions",
139                )? {
140                Some(v) => v,
141                None => continue,
142            };
143
144            let functions = match self
145                .load_definitions_for_feature::<FunctionDefinition>(&path, "functions")?
146            {
147                Some(v) => v,
148                None => continue,
149            };
150
151            let feature = Feature {
152                name: feature_name,
153                data_types,
154                flow_types,
155                runtime_functions,
156                functions,
157            };
158
159            features.push(feature);
160        }
161
162        Ok(features)
163    }
164
165    fn load_definitions_for_feature<T>(
166        &self,
167        feature_dir: &Path,
168        sub_dir: &str,
169    ) -> Result<Option<Vec<T>>, ReaderError>
170    where
171        T: DeserializeOwned + HasVersion,
172    {
173        let dir = feature_dir.join(sub_dir);
174
175        let raw: Vec<T> = match self.collect_definitions::<T>(&dir) {
176            Ok(v) => v,
177            Err(err) => {
178                if self.should_break {
179                    return Err(ReaderError::ReadFeatureError {
180                        path: dir.to_string_lossy().to_string(),
181                        source: Box::new(err),
182                    });
183                } else {
184                    // Skip this feature if we shouldn't break on error
185                    return Ok(None);
186                }
187            }
188        };
189
190        let items = raw
191            .into_iter()
192            .filter(|v| v.is_accepted(&self.accepted_version))
193            .collect();
194
195        Ok(Some(items))
196    }
197
198    fn collect_definitions<T>(&self, dir: &Path) -> Result<Vec<T>, ReaderError>
199    where
200        T: DeserializeOwned,
201    {
202        let mut definitions = Vec::new();
203
204        if !dir.exists() {
205            return Ok(definitions);
206        }
207
208        for entry in WalkDir::new(dir).into_iter().filter_map(Result::ok) {
209            let path = entry.path();
210
211            if path.is_file() && path.extension().is_some_and(|ext| ext == "json") {
212                let content = match fs::read_to_string(path) {
213                    Ok(content) => content,
214                    Err(err) => {
215                        log::error!("Failed to read file {}: {}", path.display(), err);
216                        return Err(ReaderError::ReadFileError {
217                            path: path.to_path_buf(),
218                            error: err,
219                        });
220                    }
221                };
222
223                match serde_json::from_str::<T>(&content) {
224                    Ok(def) => definitions.push(def),
225                    Err(e) => {
226                        if self.should_break {
227                            log::error!("Failed to parse JSON in file {}: {:?}", path.display(), e);
228                            return Err(ReaderError::JsonError {
229                                path: path.to_path_buf(),
230                                error: e,
231                            });
232                        } else {
233                            log::warn!("Skipping invalid JSON file {}: {:?}", path.display(), e);
234                        }
235                    }
236                }
237            }
238        }
239
240        Ok(definitions)
241    }
242}