1use std::borrow::Borrow;
2use std::collections::HashSet;
3use std::path::Path;
4use std::time::Duration;
5
6use lazy_static::lazy_static;
7use regex::Regex;
8
9use crate::flow::Error::EntryLost;
10use crate::flow::Error::*;
11use crate::value::{Map, Value};
12
13lazy_static! {
14 pub static ref ID_PATTERN: Regex = Regex::new(r"^[\w]{1,50}$").unwrap();
15}
16
17#[derive(thiserror::Error, Debug)]
18pub enum Error {
19 #[error("invalid id `{0}`")]
20 IdInvalid(String),
21 #[error("duplicated id `{0}`")]
22 IdDuplicated(String),
23 #[error("{0} lost entry `{1}`")]
24 EntryLost(String, String),
25 #[error("{0} unexpect entry `{1}`")]
26 EntryUnexpected(String, String),
27 #[error("{0} must {1} but it {2}")]
28 Violation(String, String, String),
29 #[error("{0} unexpect value {1}")]
30 ValueUnexpected(String, String),
31}
32
33#[derive(Debug, Clone)]
34pub struct Flow {
35 flow: Value,
36 meta: Map,
37}
38
39#[derive(Debug, Clone)]
40pub struct Then {
41 cond: Option<String>,
42 reg: Option<Map>,
43 goto: Option<String>,
44}
45
46impl Then {
47 pub fn cond(&self) -> Option<&str> {
48 self.cond.as_ref().map(|s| s.as_str())
49 }
50 pub fn reg(&self) -> Option<&Map> {
51 self.reg.as_ref()
52 }
53 pub fn goto(&self) -> Option<&str> {
54 self.goto.as_ref().map(|g| g.as_str())
55 }
56}
57
58impl Flow {
59 pub fn new(flow: Value, dir: &Path) -> Result<Flow, Error> {
60 let mut meta = Map::new();
61 meta.insert(
62 "task_dir".to_string(),
63 Value::String(dir.to_path_buf().to_str().unwrap().to_string()),
64 );
65
66 let flow = Flow { flow, meta };
67
68 flow._root_check()?;
69 flow._version()?;
70
71 let mut step_id_checked: HashSet<&str> = HashSet::new();
72 let pre_step_id_vec = flow.pre_step_id_vec().unwrap_or(vec![]);
73 for pre_step_id in pre_step_id_vec {
74 if !ID_PATTERN.is_match(pre_step_id) {
75 return Err(IdInvalid(pre_step_id.into()));
76 }
77 if step_id_checked.contains(pre_step_id) {
78 return Err(IdDuplicated(pre_step_id.into()));
79 } else {
80 step_id_checked.insert(pre_step_id.into());
81 }
82
83 flow._pre_check()?;
84 }
85
86 let stage_id_vec = flow._stage_id_vec()?;
87 if stage_id_vec.is_empty() {
88 return Err(EntryLost("root".into(), "stage".into()));
89 }
90
91 for stage_id in stage_id_vec {
92 if !ID_PATTERN.is_match(stage_id) {
93 return Err(IdInvalid(stage_id.into()));
94 }
95
96 flow._stage_check(stage_id)?;
97 flow._stage_concurrency(stage_id)?;
98 flow._stage_duration(stage_id)?;
99 flow._stage_round(stage_id)?;
100 flow._stage_break_on(stage_id)?;
101
102 let stage_step_id_vec = flow._stage_step_id_vec(stage_id)?;
103
104 if stage_step_id_vec.is_empty() {
105 return Err(EntryLost(stage_id.into(), "step".into()));
106 }
107
108 for stage_step_id in stage_step_id_vec {
109 if !ID_PATTERN.is_match(stage_step_id) {
110 return Err(IdInvalid(stage_step_id.into()));
111 }
112
113 if step_id_checked.contains(stage_step_id) {
114 return Err(IdDuplicated(stage_step_id.into()));
115 } else {
116 step_id_checked.insert(stage_step_id.into());
117 }
118 }
119 }
120
121 for step_id in step_id_checked.iter() {
122 flow._step_check(step_id)?;
123
124 for (aid, _) in flow._step_obj(step_id)? {
125 flow._step_action_obj(step_id, aid)?;
126 flow._step_action_func(step_id, aid)?;
127 flow._step_action_args(step_id, aid)?;
128 }
129 }
130
131 return Ok(flow);
132 }
133
134 pub fn version(&self) -> &str {
135 self._version().unwrap()
136 }
137
138 pub fn def(&self) -> Option<&Map> {
139 self.flow["def"].as_object()
140 }
141
142 pub fn meta(&self) -> &Map {
143 &self.meta
144 }
145
146 pub fn stage_step_id_vec(&self, stage_id: &str) -> Vec<&str> {
147 self._stage_step_id_vec(stage_id).unwrap()
148 }
149
150 pub fn pre_step_id_vec(&self) -> Option<Vec<&str>> {
151 let task_step_chain_arr = self.flow["pre"]["step"]
152 .as_object()
153 .map(|p| p.keys().map(|k| k.as_str()).collect());
154 if task_step_chain_arr.is_none() {
155 return None;
156 }
157 let id_vec: Vec<&str> = task_step_chain_arr.unwrap();
158 return if id_vec.is_empty() {
159 None
160 } else {
161 Some(id_vec)
162 };
163 }
164
165 pub fn stage_id_vec(&self) -> Vec<&str> {
166 self._stage_id_vec().unwrap()
167 }
168
169 pub fn stage_loader<'a, 's>(&'s self, stage_id: &'a str) -> &'a Value
170 where
171 's: 'a,
172 {
173 &self.flow["stage"][stage_id]["loader"]
174 }
175
176 pub fn stage_concurrency(&self, stage_id: &str) -> usize {
177 self._stage_concurrency(stage_id).unwrap()
178 }
179
180 pub fn stage_round(&self, stage_id: &str) -> usize {
181 self._stage_round(stage_id).unwrap()
182 }
183
184 pub fn stage_duration(&self, stage_id: &str) -> Duration {
185 self._stage_duration(stage_id).unwrap()
186 }
187
188 pub fn stage_break_on(&self, stage_id: &str) -> &str {
189 self._stage_break_on(stage_id).unwrap()
190 }
191
192 pub fn step_obj(&self, step_id: &str) -> &Map {
193 self._step_obj(step_id).unwrap()
194 }
195
196 pub fn step_action_func(&self, step_id: &str, action_id: &str) -> &str {
197 self._step_action_func(step_id, action_id).unwrap()
198 }
199
200 pub fn step_action_args(&self, step_id: &str, action_id: &str) -> &Value {
201 self._step_action_args(step_id, action_id).unwrap()
202 }
203
204 fn _root_check(&self) -> Result<(), Error> {
208 let enable_keys = vec!["version", "def", "stage", "pre"];
209 let root = self.flow.borrow();
210 let object = root
211 .as_object()
212 .ok_or_else(|| Violation("root".into(), "be a object".into(), "is not".into()))?;
213 for (k, _) in object {
214 if !enable_keys.contains(&k.as_str()) {
215 return Err(EntryUnexpected("root".into(), k.into()));
216 }
217 }
218 return Ok(());
219 }
220
221 fn _version(&self) -> Result<&str, Error> {
222 let v = self.flow["version"]
223 .as_str()
224 .ok_or(EntryLost("root".into(), "version".into()))?;
225
226 if v != "0.0.1" {
227 return Err(Violation("version".into(), "0.0.1".into(), v.into()));
228 } else {
229 Ok(v)
230 }
231 }
232
233 fn _pre_check(&self) -> Result<(), Error> {
234 let enable_keys = vec!["step"];
235 let pre = self.flow["pre"].borrow();
236 let object = pre
237 .as_object()
238 .ok_or_else(|| Violation("pre".into(), "be a object".into(), "is not".into()))?;
239 for (k, _) in object {
240 if !enable_keys.contains(&k.as_str()) {
241 return Err(EntryUnexpected("pre".into(), k.into()));
242 }
243 }
244 return Ok(());
245 }
246
247 fn _stage_id_vec(&self) -> Result<Vec<&str>, Error> {
248 let step_id_vec = self.flow["stage"]
249 .as_object()
250 .map(|p| p.keys().map(|k| k.as_str()).collect())
251 .ok_or(EntryLost("root".into(), "stage".into()))?;
252 return Ok(step_id_vec);
253 }
254
255 fn _stage_check(&self, stage_id: &str) -> Result<(), Error> {
256 let enable_keys = vec![
257 "step",
258 "loader",
259 "concurrency",
260 "round",
261 "duration",
262 "break_on",
263 ];
264 let stage = self.flow["stage"][stage_id].borrow();
265 let object = stage.as_object().ok_or_else(|| {
266 Violation(
267 format!("stage.{}", stage_id),
268 "be a object".into(),
269 "is not".into(),
270 )
271 })?;
272 for (k, _) in object {
273 if !enable_keys.contains(&k.as_str()) {
274 return Err(EntryUnexpected(format!("stage.{}", stage_id), k.into()));
275 }
276 }
277 return Ok(());
278 }
279
280 fn _stage_concurrency(&self, stage_id: &str) -> Result<usize, Error> {
281 let s = self.flow["stage"][stage_id]["concurrency"].as_u64();
282 if s.is_none() {
283 return Ok(10);
284 }
285
286 let s = s.unwrap();
287 if s < 1 {
288 return Err(Violation(
289 format!("stage.{}.concurrency", stage_id),
290 "> 0".into(),
291 format!("is {}", s),
292 ));
293 }
294 Ok(s as usize)
295 }
296
297 fn _stage_round(&self, stage_id: &str) -> Result<usize, Error> {
298 let s = self.flow["stage"][stage_id]["round"].as_u64();
299 if s.is_none() {
300 return Ok(1);
301 }
302
303 let s = s.unwrap();
304 if s < 1 {
305 return Err(Violation(
306 format!("stage.{}.round", stage_id),
307 "> 0".into(),
308 format!("is {}", s),
309 ));
310 }
311 Ok(s as usize)
312 }
313
314 fn _stage_duration(&self, stage_id: &str) -> Result<Duration, Error> {
315 let s = self.flow["stage"][stage_id]["duration"].as_u64();
316 if s.is_none() {
317 return Ok(Duration::from_secs(600));
318 }
319
320 let s = s.unwrap();
321 if s < 1 {
322 return Err(Violation(
323 format!("stage.{}.duration", stage_id),
324 "> 0".into(),
325 format!("is {}", s),
326 ));
327 }
328 Ok(Duration::from_secs(s))
329 }
330
331 fn _stage_break_on(&self, stage_id: &str) -> Result<&str, Error> {
332 let break_on = self.flow["stage"][stage_id]["break_on"]
333 .as_str()
334 .unwrap_or("stage_fail");
335 match break_on {
336 "never" => Ok(break_on),
337 "stage_fail" => Ok(break_on),
338 o => Err(ValueUnexpected(
339 format!("stage.{}.break_on", stage_id),
340 o.into(),
341 )),
342 }
343 }
344
345 fn _stage_step_id_vec(&self, stage_id: &str) -> Result<Vec<&str>, Error> {
346 let step_id_vec: Vec<&str> = self.flow["stage"][stage_id]["step"]
347 .as_object()
348 .map(|p| p.keys().map(|k| k.as_str()).collect())
349 .ok_or(EntryLost(stage_id.into(), "step".into()))?;
350 if step_id_vec.is_empty() {
351 return Err(Violation(
352 format!("stage.{}.step", stage_id),
353 "not empty".into(),
354 "is".into(),
355 ));
356 }
357 return Ok(step_id_vec);
358 }
359
360 fn _step_check(&self, step_id: &str) -> Result<(), Error> {
361 let step = self._step(step_id);
362 let _ = step.as_object().ok_or_else(|| {
363 Violation(
364 format!("step.{}", step_id),
365 "be a object".into(),
366 "is not".into(),
367 )
368 })?;
369 return Ok(());
370 }
371
372 fn _step(&self, step_id: &str) -> &Value {
373 for stage_id in self.stage_id_vec() {
374 let step = self.flow["stage"][stage_id]["step"][step_id].borrow();
375 if !step.is_null() {
376 return step;
377 }
378 }
379
380 return self.flow["pre"]["step"][step_id].borrow();
381 }
382
383 fn _step_obj(&self, step_id: &str) -> Result<&Map, Error> {
384 let step = self._step(step_id);
385 step.as_object().ok_or_else(|| {
386 Violation(
387 format!("step.{}", step_id),
388 "be a object".into(),
389 "is not".into(),
390 )
391 })
392 }
393
394 fn _step_action_obj(&self, step_id: &str, action_id: &str) -> Result<&Map, Error> {
395 let obj = self._step(step_id)[action_id].as_object().ok_or(Violation(
396 format!("{}.{}", step_id, action_id),
397 "be a object".into(),
398 "is not".into(),
399 ))?;
400 return if obj.len() != 1 {
401 Err(Violation(
402 format!("{}.{}", step_id, action_id),
403 "have 1 entry".into(),
404 "is not".into(),
405 ))
406 } else {
407 Ok(obj)
408 };
409 }
410
411 fn _step_action_func(&self, step_id: &str, action_id: &str) -> Result<&str, Error> {
412 let action_obj = self._step_action_obj(step_id, action_id)?;
413 let only = action_obj.iter().last().unwrap();
414 Ok(only.0.as_str())
415 }
416
417 fn _step_action_args(&self, step_id: &str, action_id: &str) -> Result<&Value, Error> {
418 let action_obj = self._step_action_obj(step_id, action_id)?;
419 let only = action_obj.iter().last().unwrap();
420 Ok(only.1)
421 }
422}