code0_flow/flow_definition/
mod.rs1mod error;
2mod feature;
3
4use crate::flow_definition::error::ReaderError;
5use crate::flow_definition::feature::Feature;
6use crate::flow_definition::feature::version::HasVersion;
7use serde::de::DeserializeOwned;
8use serde::{Deserialize, Serialize};
9use std::fs;
10use std::path::{Path, PathBuf};
11use tucana::shared::{
12 DefinitionDataType, FlowType, FunctionDefinition, Module, ModuleConfigurationDefinition,
13 RuntimeFlowType, RuntimeFunctionDefinition, Translation,
14};
15use walkdir::WalkDir;
16
17pub struct Reader {
18 should_break: bool,
19 accepted_features: Vec<String>,
20 accepted_version: Option<String>,
21 path: String,
22}
23
24#[derive(Serialize, Deserialize, Clone, Debug, Default)]
25pub struct ModuleConfiguration {
26 pub identifier: String,
27 pub name: Vec<Translation>,
28 pub description: Vec<Translation>,
29 pub documentation: String,
30 pub author: String,
31 pub icon: String,
32 #[serde(default, skip_serializing_if = "String::is_empty")]
33 pub version: String,
34}
35
36#[derive(Clone, Debug, Default)]
37struct LoadedModule {
38 name: String,
39 config: ModuleConfiguration,
40 data_types: Vec<DefinitionDataType>,
41 flow_types: Vec<FlowType>,
42 runtime_flow_types: Vec<RuntimeFlowType>,
43 functions: Vec<FunctionDefinition>,
44 runtime_functions: Vec<RuntimeFunctionDefinition>,
45 configurations: Vec<ModuleConfigurationDefinition>,
46}
47
48impl LoadedModule {
49 fn into_module(mut self) -> Module {
50 if self.config.identifier.is_empty() {
51 self.config.identifier = self.name.clone();
52 }
53
54 Module {
55 identifier: self.config.identifier,
56 name: self.config.name,
57 description: self.config.description,
58 documentation: self.config.documentation,
59 author: self.config.author,
60 icon: self.config.icon,
61 version: self.config.version,
62 flow_types: self.flow_types,
63 runtime_flow_types: self.runtime_flow_types,
64 function_definitions: self.functions,
65 runtime_function_definitions: self.runtime_functions,
66 definition_data_types: self.data_types,
67 configurations: self.configurations,
68 }
69 }
70}
71
72impl Reader {
73 pub fn configure(
74 path: String,
75 should_break: bool,
76 accepted_features: Vec<String>,
77 accepted_version: Option<String>,
78 ) -> Self {
79 Self {
80 should_break,
81 accepted_features,
82 accepted_version,
83 path,
84 }
85 }
86
87 pub fn read_features(&self) -> Result<Vec<Feature>, ReaderError> {
88 let modules = self
89 .read_loaded_modules()
90 .map_err(|err| ReaderError::ReadFeatureError {
91 path: self.path.clone(),
92 source: Box::new(err),
93 })?;
94
95 Ok(modules
96 .into_iter()
97 .map(|module| Feature {
98 name: module.name,
99 data_types: module.data_types,
100 flow_types: module.flow_types,
101 runtime_functions: module.runtime_functions,
102 functions: module.functions,
103 })
104 .collect())
105 }
106
107 pub fn read_modules(&self) -> Result<Vec<Module>, ReaderError> {
108 let modules = self
109 .read_loaded_modules()
110 .map_err(|err| ReaderError::ReadFeatureError {
111 path: self.path.clone(),
112 source: Box::new(err),
113 })?;
114
115 Ok(modules.into_iter().map(LoadedModule::into_module).collect())
116 }
117
118 fn read_loaded_modules(&self) -> Result<Vec<LoadedModule>, ReaderError> {
119 let root = Path::new(&self.path);
120 if !root.exists() || !root.is_dir() {
121 return Err(ReaderError::ReadDirectoryError {
122 path: root.to_path_buf(),
123 error: std::io::Error::new(
124 std::io::ErrorKind::NotFound,
125 format!("Definition path {} does not exist", root.display()),
126 ),
127 });
128 }
129
130 let mut modules = Vec::new();
131 for module_dir in find_module_directories(root) {
132 let module_name = module_name_from_paths(root, &module_dir);
133 if !self.feature_allowed(&module_name, &module_dir) {
134 continue;
135 }
136
137 let mut module = LoadedModule {
138 name: module_name,
139 ..Default::default()
140 };
141
142 let module_file = module_dir.join("module.json");
143 if module_file.is_file() {
144 match read_json_file::<ModuleConfiguration>(&module_file) {
145 Ok(config) => module.config = config,
146 Err(err) if self.should_break => return Err(err),
147 Err(err) => log::warn!(
148 "Skipping invalid module definition {}: {:?}",
149 module_file.display(),
150 err
151 ),
152 }
153 }
154
155 let entries =
156 fs::read_dir(&module_dir).map_err(|error| ReaderError::ReadDirectoryError {
157 path: module_dir.clone(),
158 error,
159 })?;
160
161 for entry in entries {
162 let entry = entry.map_err(ReaderError::DirectoryEntryError)?;
163 let file_type = entry
164 .file_type()
165 .map_err(ReaderError::DirectoryEntryError)?;
166 if !file_type.is_dir() {
167 continue;
168 }
169
170 let path = entry.path();
171 let dir_name = entry.file_name().to_string_lossy().to_string();
172
173 match dir_name.as_str() {
174 "flow_type" | "flow_types" => {
175 module
176 .flow_types
177 .extend(load_json_dir::<FlowType>(&path, self.should_break)?);
178 }
179 "runtime_flow_type" | "runtime_flow_types" => {
180 module
181 .runtime_flow_types
182 .extend(load_json_dir::<RuntimeFlowType>(&path, self.should_break)?);
183 }
184 "data_type" | "data_types" => {
185 module
186 .data_types
187 .extend(load_json_dir::<DefinitionDataType>(
188 &path,
189 self.should_break,
190 )?);
191 }
192 "runtime_definition" | "runtime_definitions" | "runtime_functions" => {
193 module.runtime_functions.extend(
194 load_json_dir::<RuntimeFunctionDefinition>(&path, self.should_break)?,
195 );
196 }
197 "function" | "functions" => {
198 module.functions.extend(load_json_dir::<FunctionDefinition>(
199 &path,
200 self.should_break,
201 )?);
202 }
203 "configuration" | "configurations" => {
204 module.configurations.extend(
205 load_json_dir::<ModuleConfigurationDefinition>(
206 &path,
207 self.should_break,
208 )?,
209 );
210 }
211 _ => {}
212 }
213 }
214
215 module
216 .data_types
217 .retain(|item| item.is_accepted(&self.accepted_version));
218 module
219 .flow_types
220 .retain(|item| item.is_accepted(&self.accepted_version));
221 module
222 .runtime_flow_types
223 .retain(|item| item.is_accepted(&self.accepted_version));
224 module
225 .functions
226 .retain(|item| item.is_accepted(&self.accepted_version));
227 module
228 .runtime_functions
229 .retain(|item| item.is_accepted(&self.accepted_version));
230
231 modules.push(module);
232 }
233
234 Ok(modules)
235 }
236
237 fn feature_allowed(&self, module_name: &str, module_path: &Path) -> bool {
238 if self.accepted_features.is_empty() {
239 return true;
240 }
241
242 let short_name = module_path
243 .file_name()
244 .and_then(|name| name.to_str())
245 .unwrap_or_default();
246
247 self.accepted_features
248 .iter()
249 .any(|feature| feature == module_name || feature == short_name)
250 }
251}
252
253fn read_json_file<T: DeserializeOwned>(path: &Path) -> Result<T, ReaderError> {
254 let content = fs::read_to_string(path).map_err(|error| ReaderError::ReadFileError {
255 path: path.to_path_buf(),
256 error,
257 })?;
258
259 serde_json::from_str::<T>(&content).map_err(|error| ReaderError::JsonError {
260 path: path.to_path_buf(),
261 error,
262 })
263}
264
265fn load_json_dir<T: DeserializeOwned>(
266 dir: &Path,
267 should_break: bool,
268) -> Result<Vec<T>, ReaderError> {
269 let mut items = Vec::new();
270 for file in WalkDir::new(dir)
271 .into_iter()
272 .filter_map(Result::ok)
273 .map(|entry| entry.into_path())
274 .filter(|path| {
275 path.is_file()
276 && path
277 .extension()
278 .and_then(|ext| ext.to_str())
279 .is_some_and(|ext| ext.eq_ignore_ascii_case("json"))
280 })
281 {
282 match read_json_file::<T>(file.as_path()) {
283 Ok(item) => items.push(item),
284 Err(err) if should_break => return Err(err),
285 Err(err) => log::warn!("Skipping invalid definition {}: {:?}", file.display(), err),
286 }
287 }
288
289 Ok(items)
290}
291
292fn find_module_directories(root: &Path) -> Vec<PathBuf> {
293 let mut modules = WalkDir::new(root)
294 .into_iter()
295 .filter_map(Result::ok)
296 .filter(|entry| entry.file_type().is_dir())
297 .map(|entry| entry.into_path())
298 .filter(|path| looks_like_module(path))
299 .collect::<Vec<_>>();
300 modules.sort();
301 modules
302}
303
304fn looks_like_module(path: &Path) -> bool {
305 let entries = match fs::read_dir(path) {
306 Ok(entries) => entries,
307 Err(_) => return false,
308 };
309
310 entries.flatten().any(|entry| {
311 let file_type = match entry.file_type() {
312 Ok(file_type) => file_type,
313 Err(_) => return false,
314 };
315
316 let name = entry.file_name().to_string_lossy().to_string();
317 name == "module.json" || (file_type.is_dir() && is_definition_dir(&name))
318 })
319}
320
321fn is_definition_dir(name: &str) -> bool {
322 matches!(
323 name,
324 "flow_type"
325 | "flow_types"
326 | "runtime_flow_type"
327 | "runtime_flow_types"
328 | "data_type"
329 | "data_types"
330 | "runtime_definition"
331 | "runtime_definitions"
332 | "runtime_functions"
333 | "function"
334 | "functions"
335 | "configuration"
336 | "configurations"
337 )
338}
339
340fn module_name_from_paths(root: &Path, module_path: &Path) -> String {
341 let relative = module_path
342 .strip_prefix(root)
343 .ok()
344 .and_then(|path| path.to_str())
345 .unwrap_or_default();
346
347 if relative.is_empty() || relative == "." {
348 module_path
349 .file_name()
350 .and_then(|name| name.to_str())
351 .unwrap_or("module")
352 .to_string()
353 } else {
354 relative.to_string()
355 }
356}