code0_flow/flow_definition/
mod.rs1mod error;
2mod feature;
3
4use crate::flow_definition::error::ReaderError;
5use crate::flow_definition::feature::Feature;
6use crate::flow_definition::feature::version::HasVersion;
7use serde::de::DeserializeOwned;
8use std::fs;
9use std::path::Path;
10use tucana::shared::{DefinitionDataType, FlowType, FunctionDefinition, RuntimeFunctionDefinition};
11use walkdir::WalkDir;
12
13pub struct Reader {
14 should_break: bool,
15 accepted_features: Vec<String>,
16 accepted_version: Option<String>,
17 path: String,
18}
19
20impl Reader {
21 pub fn configure(
22 path: String,
23 should_break: bool,
24 accepted_features: Vec<String>,
25 accepted_version: Option<String>,
26 ) -> Self {
27 Self {
28 should_break,
29 accepted_features,
30 accepted_version,
31 path,
32 }
33 }
34
35 pub fn read_features(&self) -> Result<Vec<Feature>, ReaderError> {
36 let definitions = Path::new(&self.path);
37
38 match self.read_feature_content(definitions) {
39 Ok(features) => {
40 log::info!(
41 "Loaded {:?} feature/s",
42 &features
43 .iter()
44 .map(|f| f.name.clone())
45 .collect::<Vec<String>>()
46 );
47
48 log::debug!(
49 "Found FlowTypes {:?}",
50 &features
51 .iter()
52 .flat_map(|f| f.flow_types.iter().map(|t| t.identifier.clone()))
53 .collect::<Vec<String>>()
54 );
55
56 log::debug!(
57 "Found DataTypes {:?}",
58 &features
59 .iter()
60 .flat_map(|f| f.data_types.iter().map(|t| t.identifier.clone()))
61 .collect::<Vec<String>>()
62 );
63
64 log::debug!(
65 "Found RuntimeFunctions {:?}",
66 &features
67 .iter()
68 .flat_map(|f| f.runtime_functions.iter().map(|t| t.runtime_name.clone()))
69 .collect::<Vec<String>>()
70 );
71
72 Ok(features)
73 }
74 Err(err) => {
75 log::error!("Failed to read feature/s from {}, {:?}", &self.path, err);
76 Err(ReaderError::ReadFeatureError {
77 path: self.path.to_string(),
78 source: Box::new(err),
79 })
80 }
81 }
82 }
83
84 fn read_feature_content(&self, dir: &Path) -> Result<Vec<Feature>, ReaderError> {
85 let mut features: Vec<Feature> = Vec::new();
86
87 let readdir = fs::read_dir(dir).map_err(|err| {
88 log::error!("Failed to read directory {}: {:?}", dir.display(), err);
89 ReaderError::ReadDirectoryError {
90 path: dir.to_path_buf(),
91 error: err,
92 }
93 })?;
94
95 for entry_result in readdir {
96 let entry = match entry_result {
97 Ok(entry) => entry,
98 Err(err) => {
99 log::error!("Failed to read directory entry: {:?}", err);
100 return Err(ReaderError::DirectoryEntryError(err));
101 }
102 };
103
104 let path = entry.path();
105
106 if !path.is_dir() {
107 continue;
108 }
109
110 let feature_name = path
111 .file_name()
112 .unwrap_or_default()
113 .to_string_lossy()
114 .to_string();
115
116 if !self.accepted_features.is_empty() && !self.accepted_features.contains(&feature_name)
117 {
118 log::info!("Skipping not accepted feature: {}", feature_name);
119 continue;
120 }
121
122 let data_types = match self
123 .load_definitions_for_feature::<DefinitionDataType>(&path, "data_types")?
124 {
125 Some(v) => v,
126 None => continue,
127 };
128
129 let flow_types =
130 match self.load_definitions_for_feature::<FlowType>(&path, "flow_types")? {
131 Some(v) => v,
132 None => continue,
133 };
134
135 let runtime_functions = match self
136 .load_definitions_for_feature::<RuntimeFunctionDefinition>(
137 &path,
138 "runtime_functions",
139 )? {
140 Some(v) => v,
141 None => continue,
142 };
143
144 let functions = match self
145 .load_definitions_for_feature::<FunctionDefinition>(&path, "functions")?
146 {
147 Some(v) => v,
148 None => continue,
149 };
150
151 let feature = Feature {
152 name: feature_name,
153 data_types,
154 flow_types,
155 runtime_functions,
156 functions,
157 };
158
159 features.push(feature);
160 }
161
162 Ok(features)
163 }
164
165 fn load_definitions_for_feature<T>(
166 &self,
167 feature_dir: &Path,
168 sub_dir: &str,
169 ) -> Result<Option<Vec<T>>, ReaderError>
170 where
171 T: DeserializeOwned + HasVersion,
172 {
173 let dir = feature_dir.join(sub_dir);
174
175 let raw: Vec<T> = match self.collect_definitions::<T>(&dir) {
176 Ok(v) => v,
177 Err(err) => {
178 if self.should_break {
179 return Err(ReaderError::ReadFeatureError {
180 path: dir.to_string_lossy().to_string(),
181 source: Box::new(err),
182 });
183 } else {
184 return Ok(None);
186 }
187 }
188 };
189
190 let items = raw
191 .into_iter()
192 .filter(|v| v.is_accepted(&self.accepted_version))
193 .collect();
194
195 Ok(Some(items))
196 }
197
198 fn collect_definitions<T>(&self, dir: &Path) -> Result<Vec<T>, ReaderError>
199 where
200 T: DeserializeOwned,
201 {
202 let mut definitions = Vec::new();
203
204 if !dir.exists() {
205 return Ok(definitions);
206 }
207
208 for entry in WalkDir::new(dir).into_iter().filter_map(Result::ok) {
209 let path = entry.path();
210
211 if path.is_file() && path.extension().is_some_and(|ext| ext == "json") {
212 let content = match fs::read_to_string(path) {
213 Ok(content) => content,
214 Err(err) => {
215 log::error!("Failed to read file {}: {}", path.display(), err);
216 return Err(ReaderError::ReadFileError {
217 path: path.to_path_buf(),
218 error: err,
219 });
220 }
221 };
222
223 match serde_json::from_str::<T>(&content) {
224 Ok(def) => definitions.push(def),
225 Err(e) => {
226 if self.should_break {
227 log::error!("Failed to parse JSON in file {}: {:?}", path.display(), e);
228 return Err(ReaderError::JsonError {
229 path: path.to_path_buf(),
230 error: e,
231 });
232 } else {
233 log::warn!("Skipping invalid JSON file {}: {:?}", path.display(), e);
234 }
235 }
236 }
237 }
238 }
239
240 Ok(definitions)
241 }
242}