code0_flow/flow_definition/
mod.rs1mod error;
2mod feature;
3
4use crate::flow_definition::error::ReaderError;
5use crate::flow_definition::feature::Feature;
6use serde::de::DeserializeOwned;
7use std::fs;
8use std::path::Path;
9use tucana::shared::{DefinitionDataType, FlowType, RuntimeFunctionDefinition};
10use walkdir::WalkDir;
11use crate::flow_definition::feature::version::HasVersion;
12
13pub struct Reader {
14 should_break: bool,
15 accepted_features: Vec<String>,
16 accepted_version: Option<String>,
17 path: String,
18}
19
20impl Reader {
21 pub fn configure(
22 path: String,
23 should_break: bool,
24 accepted_features: Vec<String>,
25 accepted_version: Option<String>,
26 ) -> Self {
27 Self {
28 should_break,
29 accepted_features,
30 accepted_version,
31 path,
32 }
33 }
34
35 pub fn read_features(&self) -> Result<Vec<Feature>, ReaderError> {
36 let definitions = Path::new(&self.path);
37
38 match self.read_feature_content(definitions) {
39 Ok(features) => {
40 log::info!(
41 "Loaded {:?} feature/s",
42 &features
43 .iter()
44 .map(|f| f.name.clone())
45 .collect::<Vec<String>>()
46 );
47
48 log::debug!(
49 "Found FlowTypes {:?}",
50 &features
51 .iter()
52 .flat_map(|f| f.flow_types.iter().map(|t| t.identifier.clone()))
53 .collect::<Vec<String>>()
54 );
55
56 log::debug!(
57 "Found DataTypes {:?}",
58 &features
59 .iter()
60 .flat_map(|f| f.data_types.iter().map(|t| t.identifier.clone()))
61 .collect::<Vec<String>>()
62 );
63
64 log::debug!(
65 "Found Functions {:?}",
66 &features
67 .iter()
68 .flat_map(|f| f.functions.iter().map(|t| t.runtime_name.clone()))
69 .collect::<Vec<String>>()
70 );
71
72 Ok(features)
73 }
74 Err(err) => {
75 log::error!("Failed to read feature/s from {}, {:?}", &self.path, err);
76 Err(ReaderError::ReadFeatureError {
77 path: self.path.to_string(),
78 source: Box::new(err),
79 })
80 }
81 }
82 }
83
84 fn read_feature_content(&self, dir: &Path) -> Result<Vec<Feature>, ReaderError> {
85 let mut features: Vec<Feature> = Vec::new();
86
87 let readdir = fs::read_dir(dir).map_err(|err| {
88 log::error!("Failed to read directory {}: {:?}", dir.display(), err);
89 ReaderError::ReadDirectoryError {
90 path: dir.to_path_buf(),
91 error: err,
92 }
93 })?;
94
95 for entry_result in readdir {
96 let entry = match entry_result {
97 Ok(entry) => entry,
98 Err(err) => {
99 log::error!("Failed to read directory entry: {:?}", err);
100 return Err(ReaderError::DirectoryEntryError(err));
101 }
102 };
103
104 let path = entry.path();
105
106 if !path.is_dir() {
107 continue;
108 }
109
110 let feature_name = path
111 .file_name()
112 .unwrap_or_default()
113 .to_string_lossy()
114 .to_string();
115
116 if !self.accepted_features.is_empty() && !self.accepted_features.contains(&feature_name)
117 {
118 log::info!("Skipping not accepted feature: {}", feature_name);
119 continue;
120 }
121
122 let data_types = match self
123 .load_definitions_for_feature::<DefinitionDataType>(&path, "data_type")?
124 {
125 Some(v) => v,
126 None => continue,
127 };
128
129 let flow_types =
130 match self.load_definitions_for_feature::<FlowType>(&path, "flow_type")? {
131 Some(v) => v,
132 None => continue,
133 };
134
135 let functions = match self.load_definitions_for_feature::<RuntimeFunctionDefinition>(
136 &path,
137 "runtime_definition",
138 )? {
139 Some(v) => v,
140 None => continue,
141 };
142
143 let feature = Feature {
144 name: feature_name,
145 data_types,
146 flow_types,
147 functions,
148 };
149
150 features.push(feature);
151 }
152
153 Ok(features)
154 }
155
156 fn load_definitions_for_feature<T>(
157 &self,
158 feature_dir: &Path,
159 sub_dir: &str,
160 ) -> Result<Option<Vec<T>>, ReaderError>
161 where
162 T: DeserializeOwned + HasVersion,
163 {
164 let dir = feature_dir.join(sub_dir);
165
166 let raw: Vec<T> = match self.collect_definitions::<T>(&dir) {
167 Ok(v) => v,
168 Err(err) => {
169 if self.should_break {
170 return Err(ReaderError::ReadFeatureError {
171 path: dir.to_string_lossy().to_string(),
172 source: Box::new(err),
173 });
174 } else {
175 return Ok(None);
177 }
178 }
179 };
180
181 let items = raw
182 .into_iter()
183 .filter(|v| v.is_accepted(&self.accepted_version))
184 .collect();
185
186 Ok(Some(items))
187 }
188
189 fn collect_definitions<T>(&self, dir: &Path) -> Result<Vec<T>, ReaderError>
190 where
191 T: DeserializeOwned,
192 {
193 let mut definitions = Vec::new();
194
195 if !dir.exists() {
196 return Ok(definitions);
197 }
198
199 for entry in WalkDir::new(dir).into_iter().filter_map(Result::ok) {
200 let path = entry.path();
201
202 if path.is_file() && path.extension().is_some_and(|ext| ext == "json") {
203 let content = match fs::read_to_string(path) {
204 Ok(content) => content,
205 Err(err) => {
206 log::error!("Failed to read file {}: {}", path.display(), err);
207 return Err(ReaderError::ReadFileError {
208 path: path.to_path_buf(),
209 error: err,
210 });
211 }
212 };
213
214 match serde_json::from_str::<T>(&content) {
215 Ok(def) => definitions.push(def),
216 Err(e) => {
217 if self.should_break {
218 log::error!("Failed to parse JSON in file {}: {:?}", path.display(), e);
219 return Err(ReaderError::JsonError {
220 path: path.to_path_buf(),
221 error: e,
222 });
223 } else {
224 log::warn!("Skipping invalid JSON file {}: {:?}", path.display(), e);
225 }
226 }
227 }
228 }
229 }
230
231 Ok(definitions)
232 }
233}