chord_core/
flow.rs

1use std::borrow::Borrow;
2use std::collections::HashSet;
3use std::path::Path;
4use std::time::Duration;
5
6use lazy_static::lazy_static;
7use regex::Regex;
8
9use crate::flow::Error::EntryLost;
10use crate::flow::Error::*;
11use crate::value::{Map, Value};
12
13lazy_static! {
14    pub static ref ID_PATTERN: Regex = Regex::new(r"^[\w]{1,50}$").unwrap();
15}
16
17#[derive(thiserror::Error, Debug)]
18pub enum Error {
19    #[error("invalid id `{0}`")]
20    IdInvalid(String),
21    #[error("duplicated id `{0}`")]
22    IdDuplicated(String),
23    #[error("{0} lost entry `{1}`")]
24    EntryLost(String, String),
25    #[error("{0} unexpect entry `{1}`")]
26    EntryUnexpected(String, String),
27    #[error("{0} must {1} but it {2}")]
28    Violation(String, String, String),
29    #[error("{0} unexpect value {1}")]
30    ValueUnexpected(String, String),
31}
32
33#[derive(Debug, Clone)]
34pub struct Flow {
35    flow: Value,
36    meta: Map,
37}
38
39#[derive(Debug, Clone)]
40pub struct Then {
41    cond: Option<String>,
42    reg: Option<Map>,
43    goto: Option<String>,
44}
45
46impl Then {
47    pub fn cond(&self) -> Option<&str> {
48        self.cond.as_ref().map(|s| s.as_str())
49    }
50    pub fn reg(&self) -> Option<&Map> {
51        self.reg.as_ref()
52    }
53    pub fn goto(&self) -> Option<&str> {
54        self.goto.as_ref().map(|g| g.as_str())
55    }
56}
57
58impl Flow {
59    pub fn new(flow: Value, dir: &Path) -> Result<Flow, Error> {
60        let mut meta = Map::new();
61        meta.insert(
62            "task_dir".to_string(),
63            Value::String(dir.to_path_buf().to_str().unwrap().to_string()),
64        );
65
66        let flow = Flow { flow, meta };
67
68        flow._root_check()?;
69        flow._version()?;
70
71        let mut step_id_checked: HashSet<&str> = HashSet::new();
72        let pre_step_id_vec = flow.pre_step_id_vec().unwrap_or(vec![]);
73        for pre_step_id in pre_step_id_vec {
74            if !ID_PATTERN.is_match(pre_step_id) {
75                return Err(IdInvalid(pre_step_id.into()));
76            }
77            if step_id_checked.contains(pre_step_id) {
78                return Err(IdDuplicated(pre_step_id.into()));
79            } else {
80                step_id_checked.insert(pre_step_id.into());
81            }
82
83            flow._pre_check()?;
84        }
85
86        let stage_id_vec = flow._stage_id_vec()?;
87        if stage_id_vec.is_empty() {
88            return Err(EntryLost("root".into(), "stage".into()));
89        }
90
91        for stage_id in stage_id_vec {
92            if !ID_PATTERN.is_match(stage_id) {
93                return Err(IdInvalid(stage_id.into()));
94            }
95
96            flow._stage_check(stage_id)?;
97            flow._stage_concurrency(stage_id)?;
98            flow._stage_duration(stage_id)?;
99            flow._stage_round(stage_id)?;
100            flow._stage_break_on(stage_id)?;
101
102            let stage_step_id_vec = flow._stage_step_id_vec(stage_id)?;
103
104            if stage_step_id_vec.is_empty() {
105                return Err(EntryLost(stage_id.into(), "step".into()));
106            }
107
108            for stage_step_id in stage_step_id_vec {
109                if !ID_PATTERN.is_match(stage_step_id) {
110                    return Err(IdInvalid(stage_step_id.into()));
111                }
112
113                if step_id_checked.contains(stage_step_id) {
114                    return Err(IdDuplicated(stage_step_id.into()));
115                } else {
116                    step_id_checked.insert(stage_step_id.into());
117                }
118            }
119        }
120
121        for step_id in step_id_checked.iter() {
122            flow._step_check(step_id)?;
123
124            for (aid, _) in flow._step_obj(step_id)? {
125                flow._step_action_obj(step_id, aid)?;
126                flow._step_action_func(step_id, aid)?;
127                flow._step_action_args(step_id, aid)?;
128            }
129        }
130
131        return Ok(flow);
132    }
133
134    pub fn version(&self) -> &str {
135        self._version().unwrap()
136    }
137
138    pub fn def(&self) -> Option<&Map> {
139        self.flow["def"].as_object()
140    }
141
142    pub fn meta(&self) -> &Map {
143        &self.meta
144    }
145
146    pub fn stage_step_id_vec(&self, stage_id: &str) -> Vec<&str> {
147        self._stage_step_id_vec(stage_id).unwrap()
148    }
149
150    pub fn pre_step_id_vec(&self) -> Option<Vec<&str>> {
151        let task_step_chain_arr = self.flow["pre"]["step"]
152            .as_object()
153            .map(|p| p.keys().map(|k| k.as_str()).collect());
154        if task_step_chain_arr.is_none() {
155            return None;
156        }
157        let id_vec: Vec<&str> = task_step_chain_arr.unwrap();
158        return if id_vec.is_empty() {
159            None
160        } else {
161            Some(id_vec)
162        };
163    }
164
165    pub fn stage_id_vec(&self) -> Vec<&str> {
166        self._stage_id_vec().unwrap()
167    }
168
169    pub fn stage_loader<'a, 's>(&'s self, stage_id: &'a str) -> &'a Value
170    where
171        's: 'a,
172    {
173        &self.flow["stage"][stage_id]["loader"]
174    }
175
176    pub fn stage_concurrency(&self, stage_id: &str) -> usize {
177        self._stage_concurrency(stage_id).unwrap()
178    }
179
180    pub fn stage_round(&self, stage_id: &str) -> usize {
181        self._stage_round(stage_id).unwrap()
182    }
183
184    pub fn stage_duration(&self, stage_id: &str) -> Duration {
185        self._stage_duration(stage_id).unwrap()
186    }
187
188    pub fn stage_break_on(&self, stage_id: &str) -> &str {
189        self._stage_break_on(stage_id).unwrap()
190    }
191
192    pub fn step_obj(&self, step_id: &str) -> &Map {
193        self._step_obj(step_id).unwrap()
194    }
195
196    pub fn step_action_func(&self, step_id: &str, action_id: &str) -> &str {
197        self._step_action_func(step_id, action_id).unwrap()
198    }
199
200    pub fn step_action_args(&self, step_id: &str, action_id: &str) -> &Value {
201        self._step_action_args(step_id, action_id).unwrap()
202    }
203
204    // -----------------------------------------------
205    // private
206
207    fn _root_check(&self) -> Result<(), Error> {
208        let enable_keys = vec!["version", "def", "stage", "pre"];
209        let root = self.flow.borrow();
210        let object = root
211            .as_object()
212            .ok_or_else(|| Violation("root".into(), "be a object".into(), "is not".into()))?;
213        for (k, _) in object {
214            if !enable_keys.contains(&k.as_str()) {
215                return Err(EntryUnexpected("root".into(), k.into()));
216            }
217        }
218        return Ok(());
219    }
220
221    fn _version(&self) -> Result<&str, Error> {
222        let v = self.flow["version"]
223            .as_str()
224            .ok_or(EntryLost("root".into(), "version".into()))?;
225
226        if v != "0.0.1" {
227            return Err(Violation("version".into(), "0.0.1".into(), v.into()));
228        } else {
229            Ok(v)
230        }
231    }
232
233    fn _pre_check(&self) -> Result<(), Error> {
234        let enable_keys = vec!["step"];
235        let pre = self.flow["pre"].borrow();
236        let object = pre
237            .as_object()
238            .ok_or_else(|| Violation("pre".into(), "be a object".into(), "is not".into()))?;
239        for (k, _) in object {
240            if !enable_keys.contains(&k.as_str()) {
241                return Err(EntryUnexpected("pre".into(), k.into()));
242            }
243        }
244        return Ok(());
245    }
246
247    fn _stage_id_vec(&self) -> Result<Vec<&str>, Error> {
248        let step_id_vec = self.flow["stage"]
249            .as_object()
250            .map(|p| p.keys().map(|k| k.as_str()).collect())
251            .ok_or(EntryLost("root".into(), "stage".into()))?;
252        return Ok(step_id_vec);
253    }
254
255    fn _stage_check(&self, stage_id: &str) -> Result<(), Error> {
256        let enable_keys = vec![
257            "step",
258            "loader",
259            "concurrency",
260            "round",
261            "duration",
262            "break_on",
263        ];
264        let stage = self.flow["stage"][stage_id].borrow();
265        let object = stage.as_object().ok_or_else(|| {
266            Violation(
267                format!("stage.{}", stage_id),
268                "be a object".into(),
269                "is not".into(),
270            )
271        })?;
272        for (k, _) in object {
273            if !enable_keys.contains(&k.as_str()) {
274                return Err(EntryUnexpected(format!("stage.{}", stage_id), k.into()));
275            }
276        }
277        return Ok(());
278    }
279
280    fn _stage_concurrency(&self, stage_id: &str) -> Result<usize, Error> {
281        let s = self.flow["stage"][stage_id]["concurrency"].as_u64();
282        if s.is_none() {
283            return Ok(10);
284        }
285
286        let s = s.unwrap();
287        if s < 1 {
288            return Err(Violation(
289                format!("stage.{}.concurrency", stage_id),
290                "> 0".into(),
291                format!("is {}", s),
292            ));
293        }
294        Ok(s as usize)
295    }
296
297    fn _stage_round(&self, stage_id: &str) -> Result<usize, Error> {
298        let s = self.flow["stage"][stage_id]["round"].as_u64();
299        if s.is_none() {
300            return Ok(1);
301        }
302
303        let s = s.unwrap();
304        if s < 1 {
305            return Err(Violation(
306                format!("stage.{}.round", stage_id),
307                "> 0".into(),
308                format!("is {}", s),
309            ));
310        }
311        Ok(s as usize)
312    }
313
314    fn _stage_duration(&self, stage_id: &str) -> Result<Duration, Error> {
315        let s = self.flow["stage"][stage_id]["duration"].as_u64();
316        if s.is_none() {
317            return Ok(Duration::from_secs(600));
318        }
319
320        let s = s.unwrap();
321        if s < 1 {
322            return Err(Violation(
323                format!("stage.{}.duration", stage_id),
324                "> 0".into(),
325                format!("is {}", s),
326            ));
327        }
328        Ok(Duration::from_secs(s))
329    }
330
331    fn _stage_break_on(&self, stage_id: &str) -> Result<&str, Error> {
332        let break_on = self.flow["stage"][stage_id]["break_on"]
333            .as_str()
334            .unwrap_or("stage_fail");
335        match break_on {
336            "never" => Ok(break_on),
337            "stage_fail" => Ok(break_on),
338            o => Err(ValueUnexpected(
339                format!("stage.{}.break_on", stage_id),
340                o.into(),
341            )),
342        }
343    }
344
345    fn _stage_step_id_vec(&self, stage_id: &str) -> Result<Vec<&str>, Error> {
346        let step_id_vec: Vec<&str> = self.flow["stage"][stage_id]["step"]
347            .as_object()
348            .map(|p| p.keys().map(|k| k.as_str()).collect())
349            .ok_or(EntryLost(stage_id.into(), "step".into()))?;
350        if step_id_vec.is_empty() {
351            return Err(Violation(
352                format!("stage.{}.step", stage_id),
353                "not empty".into(),
354                "is".into(),
355            ));
356        }
357        return Ok(step_id_vec);
358    }
359
360    fn _step_check(&self, step_id: &str) -> Result<(), Error> {
361        let step = self._step(step_id);
362        let _ = step.as_object().ok_or_else(|| {
363            Violation(
364                format!("step.{}", step_id),
365                "be a object".into(),
366                "is not".into(),
367            )
368        })?;
369        return Ok(());
370    }
371
372    fn _step(&self, step_id: &str) -> &Value {
373        for stage_id in self.stage_id_vec() {
374            let step = self.flow["stage"][stage_id]["step"][step_id].borrow();
375            if !step.is_null() {
376                return step;
377            }
378        }
379
380        return self.flow["pre"]["step"][step_id].borrow();
381    }
382
383    fn _step_obj(&self, step_id: &str) -> Result<&Map, Error> {
384        let step = self._step(step_id);
385        step.as_object().ok_or_else(|| {
386            Violation(
387                format!("step.{}", step_id),
388                "be a object".into(),
389                "is not".into(),
390            )
391        })
392    }
393
394    fn _step_action_obj(&self, step_id: &str, action_id: &str) -> Result<&Map, Error> {
395        let obj = self._step(step_id)[action_id].as_object().ok_or(Violation(
396            format!("{}.{}", step_id, action_id),
397            "be a object".into(),
398            "is not".into(),
399        ))?;
400        return if obj.len() != 1 {
401            Err(Violation(
402                format!("{}.{}", step_id, action_id),
403                "have 1 entry".into(),
404                "is not".into(),
405            ))
406        } else {
407            Ok(obj)
408        };
409    }
410
411    fn _step_action_func(&self, step_id: &str, action_id: &str) -> Result<&str, Error> {
412        let action_obj = self._step_action_obj(step_id, action_id)?;
413        let only = action_obj.iter().last().unwrap();
414        Ok(only.0.as_str())
415    }
416
417    fn _step_action_args(&self, step_id: &str, action_id: &str) -> Result<&Value, Error> {
418        let action_obj = self._step_action_obj(step_id, action_id)?;
419        let only = action_obj.iter().last().unwrap();
420        Ok(only.1)
421    }
422}