code0_flow/flow_definition/
mod.rs

1mod error;
2mod feature;
3
4use crate::flow_definition::error::ReaderError;
5use crate::flow_definition::feature::Feature;
6use serde::de::DeserializeOwned;
7use std::fs;
8use std::path::Path;
9use tucana::shared::{DefinitionDataType, FlowType, RuntimeFunctionDefinition};
10use walkdir::WalkDir;
11use crate::flow_definition::feature::version::HasVersion;
12
13pub struct Reader {
14    should_break: bool,
15    accepted_features: Vec<String>,
16    accepted_version: Option<String>,
17    path: String,
18}
19
20impl Reader {
21    pub fn configure(
22        path: String,
23        should_break: bool,
24        accepted_features: Vec<String>,
25        accepted_version: Option<String>,
26    ) -> Self {
27        Self {
28            should_break,
29            accepted_features,
30            accepted_version,
31            path,
32        }
33    }
34
35    pub fn read_features(&self) -> Result<Vec<Feature>, ReaderError> {
36        let definitions = Path::new(&self.path);
37
38        match self.read_feature_content(definitions) {
39            Ok(features) => {
40                log::info!(
41                    "Loaded {:?} feature/s",
42                    &features
43                        .iter()
44                        .map(|f| f.name.clone())
45                        .collect::<Vec<String>>()
46                );
47
48                log::debug!(
49                    "Found FlowTypes {:?}",
50                    &features
51                        .iter()
52                        .flat_map(|f| f.flow_types.iter().map(|t| t.identifier.clone()))
53                        .collect::<Vec<String>>()
54                );
55
56                log::debug!(
57                    "Found DataTypes {:?}",
58                    &features
59                        .iter()
60                        .flat_map(|f| f.data_types.iter().map(|t| t.identifier.clone()))
61                        .collect::<Vec<String>>()
62                );
63
64                log::debug!(
65                    "Found Functions {:?}",
66                    &features
67                        .iter()
68                        .flat_map(|f| f.functions.iter().map(|t| t.runtime_name.clone()))
69                        .collect::<Vec<String>>()
70                );
71
72                Ok(features)
73            }
74            Err(err) => {
75                log::error!("Failed to read feature/s from {}, {:?}", &self.path, err);
76                Err(ReaderError::ReadFeatureError {
77                    path: self.path.to_string(),
78                    source: Box::new(err),
79                })
80            }
81        }
82    }
83
84    fn read_feature_content(&self, dir: &Path) -> Result<Vec<Feature>, ReaderError> {
85        let mut features: Vec<Feature> = Vec::new();
86
87        let readdir = fs::read_dir(dir).map_err(|err| {
88            log::error!("Failed to read directory {}: {:?}", dir.display(), err);
89            ReaderError::ReadDirectoryError {
90                path: dir.to_path_buf(),
91                error: err,
92            }
93        })?;
94
95        for entry_result in readdir {
96            let entry = match entry_result {
97                Ok(entry) => entry,
98                Err(err) => {
99                    log::error!("Failed to read directory entry: {:?}", err);
100                    return Err(ReaderError::DirectoryEntryError(err));
101                }
102            };
103
104            let path = entry.path();
105
106            if !path.is_dir() {
107                continue;
108            }
109
110            let feature_name = path
111                .file_name()
112                .unwrap_or_default()
113                .to_string_lossy()
114                .to_string();
115
116            if !self.accepted_features.is_empty() && !self.accepted_features.contains(&feature_name)
117            {
118                log::info!("Skipping not accepted feature: {}", feature_name);
119                continue;
120            }
121
122            let data_types = match self
123                .load_definitions_for_feature::<DefinitionDataType>(&path, "data_type")?
124            {
125                Some(v) => v,
126                None => continue,
127            };
128
129            let flow_types =
130                match self.load_definitions_for_feature::<FlowType>(&path, "flow_type")? {
131                    Some(v) => v,
132                    None => continue,
133                };
134
135            let functions = match self.load_definitions_for_feature::<RuntimeFunctionDefinition>(
136                &path,
137                "runtime_definition",
138            )? {
139                Some(v) => v,
140                None => continue,
141            };
142
143            let feature = Feature {
144                name: feature_name,
145                data_types,
146                flow_types,
147                functions,
148            };
149
150            features.push(feature);
151        }
152
153        Ok(features)
154    }
155
156    fn load_definitions_for_feature<T>(
157        &self,
158        feature_dir: &Path,
159        sub_dir: &str,
160    ) -> Result<Option<Vec<T>>, ReaderError>
161    where
162        T: DeserializeOwned + HasVersion,
163    {
164        let dir = feature_dir.join(sub_dir);
165
166        let raw: Vec<T> = match self.collect_definitions::<T>(&dir) {
167            Ok(v) => v,
168            Err(err) => {
169                if self.should_break {
170                    return Err(ReaderError::ReadFeatureError {
171                        path: dir.to_string_lossy().to_string(),
172                        source: Box::new(err),
173                    });
174                } else {
175                    // Skip this feature if we shouldn't break on error
176                    return Ok(None);
177                }
178            }
179        };
180
181        let items = raw
182            .into_iter()
183            .filter(|v| v.is_accepted(&self.accepted_version))
184            .collect();
185
186        Ok(Some(items))
187    }
188
189    fn collect_definitions<T>(&self, dir: &Path) -> Result<Vec<T>, ReaderError>
190    where
191        T: DeserializeOwned,
192    {
193        let mut definitions = Vec::new();
194
195        if !dir.exists() {
196            return Ok(definitions);
197        }
198
199        for entry in WalkDir::new(dir).into_iter().filter_map(Result::ok) {
200            let path = entry.path();
201
202            if path.is_file() && path.extension().is_some_and(|ext| ext == "json") {
203                let content = match fs::read_to_string(path) {
204                    Ok(content) => content,
205                    Err(err) => {
206                        log::error!("Failed to read file {}: {}", path.display(), err);
207                        return Err(ReaderError::ReadFileError {
208                            path: path.to_path_buf(),
209                            error: err,
210                        });
211                    }
212                };
213
214                match serde_json::from_str::<T>(&content) {
215                    Ok(def) => definitions.push(def),
216                    Err(e) => {
217                        if self.should_break {
218                            log::error!("Failed to parse JSON in file {}: {:?}", path.display(), e);
219                            return Err(ReaderError::JsonError {
220                                path: path.to_path_buf(),
221                                error: e,
222                            });
223                        } else {
224                            log::warn!("Skipping invalid JSON file {}: {:?}", path.display(), e);
225                        }
226                    }
227                }
228            }
229        }
230
231        Ok(definitions)
232    }
233}