chord_flow/flow/task/
mod.rs

1use std::error::Error as StdError;
2use std::sync::Arc;
3
4use futures::future::join_all;
5use handlebars::RenderError;
6use log::{error, info, trace, warn};
7
8use chord_core::case::{CaseAsset, CaseState};
9use chord_core::collection::TailDropVec;
10use chord_core::flow::Flow;
11use chord_core::future::task::{JoinError, JoinHandle, spawn};
12use chord_core::future::time::timeout;
13use chord_core::input::{StageLoader, TaskLoader};
14use chord_core::output::{StageReporter, TaskReporter};
15use chord_core::output::Utc;
16use chord_core::step::{StepAsset, StepState};
17use chord_core::task::{StageAsset, StageState, TaskAsset, TaskId, TaskState};
18use chord_core::value::{json, Map, Value};
19use res::TaskAssetStruct;
20
21use crate::CTX_ID;
22use crate::flow::assign_by_render;
23use crate::flow::case;
24use crate::flow::case::arg::{CaseArgStruct, CaseIdStruct};
25use crate::flow::step::{action_asset_to_value, StepRunner};
26use crate::flow::step::arg::{ArgStruct, ChordStruct};
27use crate::flow::task::arg::TaskIdSimple;
28use crate::flow::task::Error::*;
29use crate::flow::task::res::StageAssetStruct;
30use crate::model::app::{App, RenderContext};
31
32pub mod arg;
33pub mod res;
34
35#[derive(thiserror::Error, Debug)]
36enum Error {
37    #[error("`{0}` render:\n{1}")]
38    Render(String, RenderError),
39
40    #[error("pre:")]
41    PreErr,
42
43    #[error("pre step `{0}`")]
44    PreFail(String),
45
46    #[error("{0} `{1}` reporter error:\n{2}")]
47    Reporter(String, String, Box<dyn StdError + Sync + Send>),
48
49    #[error("{0} `{1}` loader error:\n{2}")]
50    Loader(String, String, Box<dyn StdError + Sync + Send>),
51
52    #[error("stage `{0}` case is empty")]
53    CaseEmpty(String),
54
55    #[error("step `{0}` create:\n{1}")]
56    Step(String, Box<dyn StdError + Sync + Send>),
57
58    #[error("{0}")]
59    Unknown(String)
60}
61
62#[derive()]
63pub struct TaskRunner {
64    step_vec: Arc<TailDropVec<(String, StepRunner)>>,
65    stage_round_no: usize,
66    stage_id: Arc<String>,
67    stage_state: StageState,
68
69    pre_ctx: Option<Arc<Map>>,
70    #[allow(dead_code)]
71    pre_asset: Option<Box<dyn CaseAsset>>,
72    #[allow(dead_code)]
73    pre_step_vec: Option<Arc<TailDropVec<(String, StepRunner)>>>,
74
75    task_state: TaskState,
76
77    def_ctx: Option<Arc<Map>>,
78    reporter: Box<dyn TaskReporter>,
79    loader: Box<dyn TaskLoader>,
80    chord: Arc<ChordStruct>,
81    id: Arc<TaskIdSimple>,
82    flow: Arc<Flow>,
83    app: Arc<dyn App>,
84}
85
86impl TaskRunner {
87    pub fn new(
88        loader: Box<dyn TaskLoader>,
89        reporter: Box<dyn TaskReporter>,
90        app: Arc<dyn App>,
91        flow: Arc<Flow>,
92        id: Arc<TaskIdSimple>,
93    ) -> TaskRunner {
94        let runner = TaskRunner {
95            step_vec: Arc::new(TailDropVec::from(vec![])),
96            stage_id: Arc::new("".into()),
97            stage_round_no: 0,
98
99            stage_state: StageState::Ok,
100
101            pre_ctx: None,
102            pre_asset: None,
103            pre_step_vec: None,
104
105            task_state: TaskState::Ok,
106
107            def_ctx: None,
108            reporter,
109            loader,
110            chord: Arc::new(ChordStruct::new(app.clone())),
111            id,
112            flow,
113            app,
114        };
115
116        runner
117    }
118
119    pub fn id(&self) -> Arc<dyn TaskId> {
120        self.id.clone()
121    }
122
123    pub async fn run(mut self) -> Box<dyn TaskAsset> {
124        trace!("task run  {}", self.id);
125        let start = Utc::now();
126
127        if let Err(e) = self.reporter.start(start).await {
128            error!("task Err  {}", self.id);
129            return Box::new(TaskAssetStruct::new(
130                self.id.clone(),
131                start,
132                Utc::now(),
133                TaskState::Err(Box::new(Reporter(
134                    "task".to_string(),
135                    self.id.task().to_string(),
136                    e,
137                ))),
138            ));
139        }
140
141        if let Some(def_raw) = self.flow.def() {
142            let rc: Value = json!({
143               "__meta__": self.flow.meta()
144            });
145            let rc = RenderContext::wraps(rc).unwrap();
146            let rso = assign_by_render(self.app.get_handlebars(), &rc, def_raw, false);
147            if let Err(e) = rso {
148                error!("task Err  {}", self.id);
149                return Box::new(TaskAssetStruct::new(
150                    self.id.clone(),
151                    start,
152                    Utc::now(),
153                    TaskState::Err(Box::new(Render("def".to_string(), e))),
154                ));
155            } else {
156                self.def_ctx = Some(Arc::new(rso.unwrap()));
157            }
158        }
159
160        if let Some(pre_step_id_vec) = self.flow.pre_step_id_vec() {
161            if !pre_step_id_vec.is_empty() {
162                let pre_step_vec = step_vec_create(
163                    self.app.as_ref(),
164                    self.flow.as_ref(),
165                    pre_step_id_vec.into_iter().map(|s| s.to_owned()).collect(),
166                    self.id.clone(),
167                    self.chord.clone(),
168                )
169                .await;
170                if let Err(e) = pre_step_vec {
171                    error!("task Err  {}", self.id);
172                    return Box::new(TaskAssetStruct::new(
173                        self.id.clone(),
174                        start,
175                        Utc::now(),
176                        TaskState::Err(Box::new(e)),
177                    ));
178                }
179
180                let pre_step_vec = Arc::new(TailDropVec::from(pre_step_vec.unwrap()));
181                let pre_arg = pre_arg(
182                    self.flow.clone(),
183                    self.id.clone(),
184                    self.def_ctx.clone(),
185                    pre_step_vec.clone(),
186                )
187                .await;
188                if let Err(e) = pre_arg {
189                    error!("task Err  {}", self.id);
190                    return Box::new(TaskAssetStruct::new(
191                        self.id,
192                        start,
193                        Utc::now(),
194                        TaskState::Err(Box::new(e)),
195                    ));
196                }
197
198                let pre_asset = case_run(self.app.as_ref(), pre_arg.unwrap()).await;
199
200                match pre_asset.state() {
201                    CaseState::Err(_) => {
202                        error!("task Err  {}", self.id.clone());
203                        return Box::new(TaskAssetStruct::new(
204                            self.id,
205                            start,
206                            Utc::now(),
207                            TaskState::Err(Box::new(PreErr)),
208                        ));
209                    }
210
211                    CaseState::Fail(v) => {
212                        error!("task Err  {}", self.id);
213                        return Box::new(TaskAssetStruct::new(
214                            self.id.clone(),
215                            start,
216                            Utc::now(),
217                            TaskState::Err(Box::new(PreFail(
218                                v.last().unwrap().id().step().to_string(),
219                            ))),
220                        ));
221                    }
222                    CaseState::Ok(sa_vec) => {
223                        let pre_ctx = pre_ctx_create(sa_vec.as_ref()).await;
224                        self.pre_ctx = Some(Arc::new(pre_ctx));
225                        self.pre_asset = Some(pre_asset);
226                        self.pre_step_vec = Some(pre_step_vec);
227                    }
228                }
229            }
230        };
231
232        let result = self.task_run().await;
233
234        let task_asset = if let Err(e) = result {
235            error!("task Err  {}", self.id);
236            TaskAssetStruct::new(
237                self.id.clone(),
238                start,
239                Utc::now(),
240                TaskState::Err(Box::new(e)),
241            )
242        } else {
243            match self.task_state {
244                TaskState::Ok => {
245                    info!("task Ok   {}", self.id.clone());
246                    TaskAssetStruct::new(self.id.clone(), start, Utc::now(), TaskState::Ok)
247                }
248                TaskState::Fail(c) => {
249                    warn!("task Fail {}", self.id);
250                    TaskAssetStruct::new(
251                        self.id.clone(),
252                        start,
253                        Utc::now(),
254                        TaskState::Fail(c.clone()),
255                    )
256                }
257                TaskState::Err(e) => {
258                    error!("task Err  {}", self.id);
259                    TaskAssetStruct::new(self.id.clone(), start, Utc::now(), TaskState::Err(e))
260                }
261            }
262        };
263
264        if let Err(e) = self.reporter.end(&task_asset).await {
265            error!("task Err  {}", self.id);
266            return Box::new(TaskAssetStruct::new(
267                self.id.clone(),
268                start,
269                Utc::now(),
270                TaskState::Err(Box::new(Reporter(
271                    "task".to_string(),
272                    self.id.task().to_string(),
273                    e,
274                ))),
275            ));
276        }
277
278        Box::new(task_asset)
279    }
280
281    async fn task_run(&mut self) -> Result<(), Error> {
282        let stage_id_vec: Vec<String> = self
283            .flow
284            .stage_id_vec()
285            .into_iter()
286            .map(|s| s.to_owned())
287            .collect();
288        for state_id in stage_id_vec {
289            trace!("stage run   {}-{}", self.id, state_id);
290            self.stage_run(state_id.as_str()).await?;
291            if let StageState::Fail(_) = self.stage_state {
292                if "stage_fail" == self.flow.stage_break_on(state_id.as_str()) {
293                    break;
294                }
295            }
296        }
297        Ok(())
298    }
299
300    async fn stage_run(&mut self, stage_id: &str) -> Result<(), Error> {
301        self.stage_id = Arc::new(stage_id.to_string());
302        self.stage_state = StageState::Ok;
303        let step_id_vec: Vec<String> = self
304            .flow
305            .stage_step_id_vec(stage_id)
306            .into_iter()
307            .map(|s| s.to_owned())
308            .collect();
309        let action_vec = step_vec_create(
310            self.app.as_ref(),
311            self.flow.as_ref(),
312            step_id_vec,
313            self.id.clone(),
314            self.chord.clone(),
315        )
316        .await?;
317        self.step_vec = Arc::new(TailDropVec::from(action_vec));
318
319        let duration = self.flow.stage_duration(stage_id);
320        let srr = self.stage_run_round(stage_id);
321        match timeout(duration, srr).await {
322            Ok(r) => r?,
323            Err(_) => (),
324        }
325        return Ok(());
326    }
327
328    async fn stage_run_round(&mut self, stage_id: &str) -> Result<(), Error> {
329        let concurrency = self.flow.stage_concurrency(stage_id);
330        let round_max = self.flow.stage_round(stage_id);
331        let mut round_count = 1;
332        loop {
333            self.stage_round_no = round_count;
334            self.stage_run_once(stage_id, concurrency).await?;
335            if round_count >= round_max {
336                break;
337            }
338            round_count += 1;
339        }
340        return Ok(());
341    }
342
343    async fn stage_run_once(&mut self, stage_id: &str, concurrency: usize) -> Result<(), Error> {
344        let start = Utc::now();
345
346        let mut loader = self
347            .loader
348            .stage(stage_id)
349            .await
350            .map_err(|e| Loader("stage".to_string(), stage_id.to_string(), e))?;
351
352        let mut reporter = self
353            .reporter
354            .stage(stage_id)
355            .await
356            .map_err(|e| Reporter("stage".to_string(), stage_id.to_string(), e))?;
357
358        reporter
359            .start(Utc::now())
360            .await
361            .map_err(|e| Reporter("stage".to_string(), stage_id.to_string(), e))?;
362
363        let result = self
364            .stage_run_io(stage_id, loader.as_mut(), reporter.as_mut(), concurrency)
365            .await;
366
367        let stage_asset = if let Err(e) = result {
368            error!("stage Err  {}-{}, {:?}", self.id, stage_id, e);
369            StageAssetStruct::new(
370                stage_id.to_string(),
371                start,
372                Utc::now(),
373                StageState::Err(Box::new(e)),
374            )
375        } else {
376            match &self.stage_state {
377                StageState::Ok => {
378                    info!("stage Ok   {}-{}", self.id.clone(), stage_id);
379                    StageAssetStruct::new(stage_id.to_string(), start, Utc::now(), StageState::Ok)
380                }
381                StageState::Fail(c) => {
382                    warn!("stage Fail {}-{}", self.id.clone(), stage_id);
383                    StageAssetStruct::new(
384                        stage_id.to_string(),
385                        start,
386                        Utc::now(),
387                        StageState::Fail(c.clone()),
388                    )
389                }
390                StageState::Err(_) => unreachable!(),
391            }
392        };
393
394        reporter
395            .end(&stage_asset)
396            .await
397            .map_err(|e| Reporter("stage".to_string(), stage_id.to_string(), e))?;
398
399        match stage_asset.state() {
400            StageState::Ok => Ok(()),
401            StageState::Fail(_) => Ok(()),
402            StageState::Err(e) => Err(Unknown(e.to_string())),
403        }
404    }
405
406    async fn stage_run_io(
407        &mut self,
408        stage_id: &str,
409        loader: &mut dyn StageLoader,
410        reporter: &mut dyn StageReporter,
411        concurrency: usize,
412    ) -> Result<(), Error> {
413        let mut load_times = 0;
414        loop {
415            let case_data_vec: Vec<(String, Value)> = loader
416                .load(concurrency)
417                .await
418                .map_err(|e| Loader("stage".to_string(), stage_id.to_string(), e))?;
419            load_times = load_times + 1;
420            if case_data_vec.len() == 0 {
421                return if load_times == 1 {
422                    Err(CaseEmpty(stage_id.to_string()))
423                } else {
424                    trace!("stage exhaust data {}, {}", self.id, stage_id);
425                    Ok(())
426                };
427            }
428
429            trace!(
430                "stage load data {}, {}, {}",
431                self.id,
432                stage_id,
433                case_data_vec.len()
434            );
435
436            let case_asset_vec = self.case_data_vec_run(case_data_vec, concurrency).await;
437            let first_fail = case_asset_vec.iter().find(|ca| !ca.state().is_ok());
438            if first_fail.is_some() {
439                let cause_case = first_fail.unwrap();
440                let cause = match cause_case.state() {
441                    CaseState::Err(_) => format!("case: {}", cause_case.id()),
442                    CaseState::Fail(v) => {
443                        let last_step_id = v
444                            .last()
445                            .map(|s| s.id().to_string())
446                            .or_else(|| Some(String::new()))
447                            .unwrap();
448                        format!("step: {}", last_step_id)
449                    }
450                    CaseState::Ok(_) => String::new(),
451                };
452                self.stage_state = StageState::Fail(cause.clone());
453                self.task_state = TaskState::Fail(cause.clone());
454            }
455            reporter
456                .report(&case_asset_vec)
457                .await
458                .map_err(|e| Reporter("stage".to_string(), stage_id.to_string(), e))?;
459        }
460    }
461
462    async fn case_data_vec_run(
463        &mut self,
464        case_vec: Vec<(String, Value)>,
465        concurrency: usize,
466    ) -> Vec<Box<dyn CaseAsset>> {
467        let ca_vec = self.case_arg_vec(case_vec);
468
469        let mut case_join_result_vec = Vec::<Result<Box<dyn CaseAsset>, JoinError>>::new();
470        let mut futures = vec![];
471        for ca in ca_vec {
472            let f = case_spawn(self.app.clone(), ca);
473            futures.push(f);
474            if futures.len() >= concurrency {
475                let case_asset = join_all(futures.split_off(0)).await;
476                case_join_result_vec.extend(case_asset);
477            }
478        }
479        if !futures.is_empty() {
480            let case_asset = join_all(futures).await;
481            case_join_result_vec.extend(case_asset);
482        }
483
484        let mut case_asset_vec = Vec::with_capacity(case_join_result_vec.len());
485        for res in case_join_result_vec {
486            case_asset_vec.push(res.unwrap());
487        }
488        case_asset_vec
489    }
490
491    fn case_arg_vec<'p>(&self, data: Vec<(String, Value)>) -> Vec<CaseArgStruct> {
492        let vec = data
493            .into_iter()
494            .map(|(id, d)| {
495                CaseArgStruct::new(
496                    self.flow.clone(),
497                    self.step_vec.clone(),
498                    d,
499                    self.pre_ctx.clone(),
500                    self.def_ctx.clone(),
501                    self.id.clone(),
502                    self.stage_id.clone(),
503                    Arc::new(self.stage_round_no.to_string()),
504                    id,
505                )
506            })
507            .collect();
508        return vec;
509    }
510}
511
512async fn pre_arg(
513    flow: Arc<Flow>,
514    task_id: Arc<TaskIdSimple>,
515    def_ctx: Option<Arc<Map>>,
516    pre_step_vec: Arc<TailDropVec<(String, StepRunner)>>,
517) -> Result<CaseArgStruct, Error> {
518    Ok(CaseArgStruct::new(
519        flow.clone(),
520        pre_step_vec,
521        Value::Null,
522        None,
523        def_ctx,
524        task_id.clone(),
525        Arc::new("pre".into()),
526        Arc::new("1".into()),
527        "1".into(),
528    ))
529}
530
531async fn pre_ctx_create(sa_vec: &Vec<Box<dyn StepAsset>>) -> Map {
532    let mut pre_ctx = Map::new();
533    pre_ctx.insert("step".to_owned(), Value::Object(Map::new()));
534    for sa in sa_vec.iter() {
535        if let StepState::Ok(av) = sa.state() {
536            let mut am = Map::new();
537            for a in av.iter() {
538                am.insert(a.id().to_string(), action_asset_to_value(a.as_ref()));
539            }
540            pre_ctx["step"][sa.id().step()] = Value::Object(am);
541        }
542    }
543    pre_ctx
544}
545
546
547async fn step_vec_create(
548    app: &dyn App,
549    flow: &Flow,
550    step_id_vec: Vec<String>,
551    task_id: Arc<TaskIdSimple>,
552    chord: Arc<ChordStruct>,
553) -> Result<Vec<(String, StepRunner)>, Error> {
554    let mut step_vec = vec![];
555    let case_id = Arc::new(CaseIdStruct::new(
556        task_id,
557        Arc::new("create".to_string()),
558        Arc::new("1".to_string()),
559        "pre".to_string(),
560    ));
561    for sid in step_id_vec {
562        let mut arg = ArgStruct::new(
563            app,
564            flow,
565            RenderContext::wraps(Value::Object(Map::with_capacity(0)))
566                .map_err(|e| Step(sid.clone(), Box::new(e)))?,
567            case_id.clone(),
568            sid.clone(),
569        );
570
571        let pr = StepRunner::new(chord.clone(), &mut arg)
572            .await
573            .map_err(|e| Step(sid.clone(), Box::new(e)))?;
574        step_vec.push((sid, pr));
575    }
576    Ok(step_vec)
577}
578
579async fn case_run(flow_ctx: &dyn App, case_arg: CaseArgStruct) -> Box<dyn CaseAsset> {
580    Box::new(case::run(flow_ctx, case_arg).await)
581}
582
583fn case_spawn(flow_ctx: Arc<dyn App>, case_arg: CaseArgStruct) -> JoinHandle<Box<dyn CaseAsset>> {
584    spawn(case_run_arc(flow_ctx, case_arg))
585}
586
587async fn case_run_arc(flow_ctx: Arc<dyn App>, case_arg: CaseArgStruct) -> Box<dyn CaseAsset> {
588    CTX_ID
589        .scope(
590            case_arg.id().to_string(),
591            case_run(flow_ctx.as_ref(), case_arg),
592        )
593        .await
594}