code0_flow/flow_definition/
mod.rs1mod error;
2mod feature;
3
4use crate::flow_definition::error::ReaderError;
5use crate::flow_definition::feature::Feature;
6use crate::flow_definition::feature::version::HasVersion;
7use serde::de::DeserializeOwned;
8use std::fs;
9use std::path::Path;
10use tucana::shared::{DefinitionDataType, FlowType, RuntimeFunctionDefinition, Version};
11use walkdir::WalkDir;
12
13pub struct Reader {
14 should_break: bool,
15 accepted_features: Vec<String>,
16 accepted_version: Option<Version>,
17 path: String,
18}
19
20impl Reader {
21 pub fn configure(
22 path: String,
23 should_break: bool,
24 accepted_features: Vec<String>,
25 accepted_version: Option<Version>,
26 ) -> Self {
27 Self {
28 should_break,
29 accepted_features,
30 accepted_version,
31 path,
32 }
33 }
34
35 pub fn read_features(&self) -> Result<Vec<Feature>, ReaderError> {
36 let definitions = Path::new(&self.path);
37
38 match self.read_feature_content(definitions) {
39 Ok(features) => {
40 log::info!(
41 "Loaded {:?} feature/s",
42 &features
43 .iter()
44 .map(|f| f.name.clone())
45 .collect::<Vec<String>>()
46 );
47
48 log::debug!(
49 "Found FlowTypes {:?}",
50 &features
51 .iter()
52 .flat_map(|f| f.flow_types.iter().map(|t| t.identifier.clone()))
53 .collect::<Vec<String>>()
54 );
55
56 log::debug!(
57 "Found DataTypes {:?}",
58 &features
59 .iter()
60 .flat_map(|f| f.data_types.iter().map(|t| t.identifier.clone()))
61 .collect::<Vec<String>>()
62 );
63
64 log::debug!(
65 "Found Functions {:?}",
66 &features
67 .iter()
68 .flat_map(|f| f.functions.iter().map(|t| t.runtime_name.clone()))
69 .collect::<Vec<String>>()
70 );
71
72 Ok(features)
73 }
74 Err(err) => {
75 log::error!("Failed to read feature/s from {}, {:?}", &self.path, err);
76 Err(ReaderError::ReadFeatureError {
77 path: self.path.to_string(),
78 source: Box::new(err),
79 })
80 }
81 }
82 }
83
84 fn read_feature_content(&self, dir: &Path) -> Result<Vec<Feature>, ReaderError> {
85 let mut features: Vec<Feature> = Vec::new();
86
87 let readdir = fs::read_dir(dir).map_err(|err| {
88 log::error!("Failed to read directory {}: {:?}", dir.display(), err);
89 ReaderError::ReadDirectoryError {
90 path: dir.to_path_buf(),
91 error: err,
92 }
93 })?;
94
95 for entry_result in readdir {
96 let entry = match entry_result {
97 Ok(entry) => entry,
98 Err(err) => {
99 log::error!("Failed to read directory entry: {:?}", err);
100 return Err(ReaderError::DirectoryEntryError(err));
101 }
102 };
103
104 let path = entry.path();
105
106 if !path.is_dir() {
107 continue;
108 }
109
110 let feature_name = path
111 .file_name()
112 .unwrap_or_default()
113 .to_string_lossy()
114 .to_string();
115
116 if !self.accepted_features.is_empty() && !self.accepted_features.contains(&feature_name)
117 {
118 log::info!("Skipping not accepted feature: {}", feature_name);
119 continue;
120 }
121
122 let data_types = match self
123 .load_definitions_for_feature::<DefinitionDataType>(&path, "data_type")?
124 {
125 Some(v) => v,
126 None => continue,
127 };
128
129 let flow_types =
130 match self.load_definitions_for_feature::<FlowType>(&path, "flow_type")? {
131 Some(v) => v,
132 None => continue,
133 };
134
135 let functions = match self.load_definitions_for_feature::<RuntimeFunctionDefinition>(
136 &path,
137 "runtime_definition",
138 )? {
139 Some(v) => v,
140 None => continue,
141 };
142
143 let feature = Feature {
144 name: feature_name,
145 data_types,
146 flow_types,
147 functions,
148 };
149
150 features.push(feature);
151 }
152
153 Ok(features)
154 }
155
156 fn load_definitions_for_feature<T>(
157 &self,
158 feature_dir: &Path,
159 sub_dir: &str,
160 ) -> Result<Option<Vec<T>>, ReaderError>
161 where
162 T: DeserializeOwned + HasVersion,
163 {
164 let dir = feature_dir.join(sub_dir);
165
166 let raw: Vec<T> = match self.collect_definitions::<T>(&dir) {
167 Ok(v) => v,
168 Err(err) => {
169 if self.should_break {
170 return Err(ReaderError::ReadFeatureError {
171 path: dir.to_string_lossy().to_string(),
172 source: Box::new(err),
173 });
174 } else {
175 return Ok(None);
177 }
178 }
179 };
180
181 let items = raw
182 .into_iter()
183 .map(|mut v| {
184 v.normalize_version();
185 v
186 })
187 .filter(|v| v.is_accepted(&self.accepted_version))
188 .collect();
189
190 Ok(Some(items))
191 }
192
193 fn collect_definitions<T>(&self, dir: &Path) -> Result<Vec<T>, ReaderError>
194 where
195 T: DeserializeOwned,
196 {
197 let mut definitions = Vec::new();
198
199 if !dir.exists() {
200 return Ok(definitions);
201 }
202
203 for entry in WalkDir::new(dir).into_iter().filter_map(Result::ok) {
204 let path = entry.path();
205
206 if path.is_file() && path.extension().is_some_and(|ext| ext == "json") {
207 let content = match fs::read_to_string(path) {
208 Ok(content) => content,
209 Err(err) => {
210 log::error!("Failed to read file {}: {}", path.display(), err);
211 return Err(ReaderError::ReadFileError {
212 path: path.to_path_buf(),
213 error: err,
214 });
215 }
216 };
217
218 match serde_json::from_str::<T>(&content) {
219 Ok(def) => definitions.push(def),
220 Err(e) => {
221 if self.should_break {
222 log::error!("Failed to parse JSON in file {}: {:?}", path.display(), e);
223 return Err(ReaderError::JsonError {
224 path: path.to_path_buf(),
225 error: e,
226 });
227 } else {
228 log::warn!("Skipping invalid JSON file {}: {:?}", path.display(), e);
229 }
230 }
231 }
232 }
233 }
234
235 Ok(definitions)
236 }
237}