1use std::error::Error as StdError;
2use std::sync::Arc;
3
4use futures::future::join_all;
5use handlebars::RenderError;
6use log::{error, info, trace, warn};
7
8use chord_core::case::{CaseAsset, CaseState};
9use chord_core::collection::TailDropVec;
10use chord_core::flow::Flow;
11use chord_core::future::task::{JoinError, JoinHandle, spawn};
12use chord_core::future::time::timeout;
13use chord_core::input::{StageLoader, TaskLoader};
14use chord_core::output::{StageReporter, TaskReporter};
15use chord_core::output::Utc;
16use chord_core::step::{StepAsset, StepState};
17use chord_core::task::{StageAsset, StageState, TaskAsset, TaskId, TaskState};
18use chord_core::value::{json, Map, Value};
19use res::TaskAssetStruct;
20
21use crate::CTX_ID;
22use crate::flow::assign_by_render;
23use crate::flow::case;
24use crate::flow::case::arg::{CaseArgStruct, CaseIdStruct};
25use crate::flow::step::{action_asset_to_value, StepRunner};
26use crate::flow::step::arg::{ArgStruct, ChordStruct};
27use crate::flow::task::arg::TaskIdSimple;
28use crate::flow::task::Error::*;
29use crate::flow::task::res::StageAssetStruct;
30use crate::model::app::{App, RenderContext};
31
32pub mod arg;
33pub mod res;
34
35#[derive(thiserror::Error, Debug)]
36enum Error {
37 #[error("`{0}` render:\n{1}")]
38 Render(String, RenderError),
39
40 #[error("pre:")]
41 PreErr,
42
43 #[error("pre step `{0}`")]
44 PreFail(String),
45
46 #[error("{0} `{1}` reporter error:\n{2}")]
47 Reporter(String, String, Box<dyn StdError + Sync + Send>),
48
49 #[error("{0} `{1}` loader error:\n{2}")]
50 Loader(String, String, Box<dyn StdError + Sync + Send>),
51
52 #[error("stage `{0}` case is empty")]
53 CaseEmpty(String),
54
55 #[error("step `{0}` create:\n{1}")]
56 Step(String, Box<dyn StdError + Sync + Send>),
57
58 #[error("{0}")]
59 Unknown(String)
60}
61
62#[derive()]
63pub struct TaskRunner {
64 step_vec: Arc<TailDropVec<(String, StepRunner)>>,
65 stage_round_no: usize,
66 stage_id: Arc<String>,
67 stage_state: StageState,
68
69 pre_ctx: Option<Arc<Map>>,
70 #[allow(dead_code)]
71 pre_asset: Option<Box<dyn CaseAsset>>,
72 #[allow(dead_code)]
73 pre_step_vec: Option<Arc<TailDropVec<(String, StepRunner)>>>,
74
75 task_state: TaskState,
76
77 def_ctx: Option<Arc<Map>>,
78 reporter: Box<dyn TaskReporter>,
79 loader: Box<dyn TaskLoader>,
80 chord: Arc<ChordStruct>,
81 id: Arc<TaskIdSimple>,
82 flow: Arc<Flow>,
83 app: Arc<dyn App>,
84}
85
86impl TaskRunner {
87 pub fn new(
88 loader: Box<dyn TaskLoader>,
89 reporter: Box<dyn TaskReporter>,
90 app: Arc<dyn App>,
91 flow: Arc<Flow>,
92 id: Arc<TaskIdSimple>,
93 ) -> TaskRunner {
94 let runner = TaskRunner {
95 step_vec: Arc::new(TailDropVec::from(vec![])),
96 stage_id: Arc::new("".into()),
97 stage_round_no: 0,
98
99 stage_state: StageState::Ok,
100
101 pre_ctx: None,
102 pre_asset: None,
103 pre_step_vec: None,
104
105 task_state: TaskState::Ok,
106
107 def_ctx: None,
108 reporter,
109 loader,
110 chord: Arc::new(ChordStruct::new(app.clone())),
111 id,
112 flow,
113 app,
114 };
115
116 runner
117 }
118
119 pub fn id(&self) -> Arc<dyn TaskId> {
120 self.id.clone()
121 }
122
123 pub async fn run(mut self) -> Box<dyn TaskAsset> {
124 trace!("task run {}", self.id);
125 let start = Utc::now();
126
127 if let Err(e) = self.reporter.start(start).await {
128 error!("task Err {}", self.id);
129 return Box::new(TaskAssetStruct::new(
130 self.id.clone(),
131 start,
132 Utc::now(),
133 TaskState::Err(Box::new(Reporter(
134 "task".to_string(),
135 self.id.task().to_string(),
136 e,
137 ))),
138 ));
139 }
140
141 if let Some(def_raw) = self.flow.def() {
142 let rc: Value = json!({
143 "__meta__": self.flow.meta()
144 });
145 let rc = RenderContext::wraps(rc).unwrap();
146 let rso = assign_by_render(self.app.get_handlebars(), &rc, def_raw, false);
147 if let Err(e) = rso {
148 error!("task Err {}", self.id);
149 return Box::new(TaskAssetStruct::new(
150 self.id.clone(),
151 start,
152 Utc::now(),
153 TaskState::Err(Box::new(Render("def".to_string(), e))),
154 ));
155 } else {
156 self.def_ctx = Some(Arc::new(rso.unwrap()));
157 }
158 }
159
160 if let Some(pre_step_id_vec) = self.flow.pre_step_id_vec() {
161 if !pre_step_id_vec.is_empty() {
162 let pre_step_vec = step_vec_create(
163 self.app.as_ref(),
164 self.flow.as_ref(),
165 pre_step_id_vec.into_iter().map(|s| s.to_owned()).collect(),
166 self.id.clone(),
167 self.chord.clone(),
168 )
169 .await;
170 if let Err(e) = pre_step_vec {
171 error!("task Err {}", self.id);
172 return Box::new(TaskAssetStruct::new(
173 self.id.clone(),
174 start,
175 Utc::now(),
176 TaskState::Err(Box::new(e)),
177 ));
178 }
179
180 let pre_step_vec = Arc::new(TailDropVec::from(pre_step_vec.unwrap()));
181 let pre_arg = pre_arg(
182 self.flow.clone(),
183 self.id.clone(),
184 self.def_ctx.clone(),
185 pre_step_vec.clone(),
186 )
187 .await;
188 if let Err(e) = pre_arg {
189 error!("task Err {}", self.id);
190 return Box::new(TaskAssetStruct::new(
191 self.id,
192 start,
193 Utc::now(),
194 TaskState::Err(Box::new(e)),
195 ));
196 }
197
198 let pre_asset = case_run(self.app.as_ref(), pre_arg.unwrap()).await;
199
200 match pre_asset.state() {
201 CaseState::Err(_) => {
202 error!("task Err {}", self.id.clone());
203 return Box::new(TaskAssetStruct::new(
204 self.id,
205 start,
206 Utc::now(),
207 TaskState::Err(Box::new(PreErr)),
208 ));
209 }
210
211 CaseState::Fail(v) => {
212 error!("task Err {}", self.id);
213 return Box::new(TaskAssetStruct::new(
214 self.id.clone(),
215 start,
216 Utc::now(),
217 TaskState::Err(Box::new(PreFail(
218 v.last().unwrap().id().step().to_string(),
219 ))),
220 ));
221 }
222 CaseState::Ok(sa_vec) => {
223 let pre_ctx = pre_ctx_create(sa_vec.as_ref()).await;
224 self.pre_ctx = Some(Arc::new(pre_ctx));
225 self.pre_asset = Some(pre_asset);
226 self.pre_step_vec = Some(pre_step_vec);
227 }
228 }
229 }
230 };
231
232 let result = self.task_run().await;
233
234 let task_asset = if let Err(e) = result {
235 error!("task Err {}", self.id);
236 TaskAssetStruct::new(
237 self.id.clone(),
238 start,
239 Utc::now(),
240 TaskState::Err(Box::new(e)),
241 )
242 } else {
243 match self.task_state {
244 TaskState::Ok => {
245 info!("task Ok {}", self.id.clone());
246 TaskAssetStruct::new(self.id.clone(), start, Utc::now(), TaskState::Ok)
247 }
248 TaskState::Fail(c) => {
249 warn!("task Fail {}", self.id);
250 TaskAssetStruct::new(
251 self.id.clone(),
252 start,
253 Utc::now(),
254 TaskState::Fail(c.clone()),
255 )
256 }
257 TaskState::Err(e) => {
258 error!("task Err {}", self.id);
259 TaskAssetStruct::new(self.id.clone(), start, Utc::now(), TaskState::Err(e))
260 }
261 }
262 };
263
264 if let Err(e) = self.reporter.end(&task_asset).await {
265 error!("task Err {}", self.id);
266 return Box::new(TaskAssetStruct::new(
267 self.id.clone(),
268 start,
269 Utc::now(),
270 TaskState::Err(Box::new(Reporter(
271 "task".to_string(),
272 self.id.task().to_string(),
273 e,
274 ))),
275 ));
276 }
277
278 Box::new(task_asset)
279 }
280
281 async fn task_run(&mut self) -> Result<(), Error> {
282 let stage_id_vec: Vec<String> = self
283 .flow
284 .stage_id_vec()
285 .into_iter()
286 .map(|s| s.to_owned())
287 .collect();
288 for state_id in stage_id_vec {
289 trace!("stage run {}-{}", self.id, state_id);
290 self.stage_run(state_id.as_str()).await?;
291 if let StageState::Fail(_) = self.stage_state {
292 if "stage_fail" == self.flow.stage_break_on(state_id.as_str()) {
293 break;
294 }
295 }
296 }
297 Ok(())
298 }
299
300 async fn stage_run(&mut self, stage_id: &str) -> Result<(), Error> {
301 self.stage_id = Arc::new(stage_id.to_string());
302 self.stage_state = StageState::Ok;
303 let step_id_vec: Vec<String> = self
304 .flow
305 .stage_step_id_vec(stage_id)
306 .into_iter()
307 .map(|s| s.to_owned())
308 .collect();
309 let action_vec = step_vec_create(
310 self.app.as_ref(),
311 self.flow.as_ref(),
312 step_id_vec,
313 self.id.clone(),
314 self.chord.clone(),
315 )
316 .await?;
317 self.step_vec = Arc::new(TailDropVec::from(action_vec));
318
319 let duration = self.flow.stage_duration(stage_id);
320 let srr = self.stage_run_round(stage_id);
321 match timeout(duration, srr).await {
322 Ok(r) => r?,
323 Err(_) => (),
324 }
325 return Ok(());
326 }
327
328 async fn stage_run_round(&mut self, stage_id: &str) -> Result<(), Error> {
329 let concurrency = self.flow.stage_concurrency(stage_id);
330 let round_max = self.flow.stage_round(stage_id);
331 let mut round_count = 1;
332 loop {
333 self.stage_round_no = round_count;
334 self.stage_run_once(stage_id, concurrency).await?;
335 if round_count >= round_max {
336 break;
337 }
338 round_count += 1;
339 }
340 return Ok(());
341 }
342
343 async fn stage_run_once(&mut self, stage_id: &str, concurrency: usize) -> Result<(), Error> {
344 let start = Utc::now();
345
346 let mut loader = self
347 .loader
348 .stage(stage_id)
349 .await
350 .map_err(|e| Loader("stage".to_string(), stage_id.to_string(), e))?;
351
352 let mut reporter = self
353 .reporter
354 .stage(stage_id)
355 .await
356 .map_err(|e| Reporter("stage".to_string(), stage_id.to_string(), e))?;
357
358 reporter
359 .start(Utc::now())
360 .await
361 .map_err(|e| Reporter("stage".to_string(), stage_id.to_string(), e))?;
362
363 let result = self
364 .stage_run_io(stage_id, loader.as_mut(), reporter.as_mut(), concurrency)
365 .await;
366
367 let stage_asset = if let Err(e) = result {
368 error!("stage Err {}-{}, {:?}", self.id, stage_id, e);
369 StageAssetStruct::new(
370 stage_id.to_string(),
371 start,
372 Utc::now(),
373 StageState::Err(Box::new(e)),
374 )
375 } else {
376 match &self.stage_state {
377 StageState::Ok => {
378 info!("stage Ok {}-{}", self.id.clone(), stage_id);
379 StageAssetStruct::new(stage_id.to_string(), start, Utc::now(), StageState::Ok)
380 }
381 StageState::Fail(c) => {
382 warn!("stage Fail {}-{}", self.id.clone(), stage_id);
383 StageAssetStruct::new(
384 stage_id.to_string(),
385 start,
386 Utc::now(),
387 StageState::Fail(c.clone()),
388 )
389 }
390 StageState::Err(_) => unreachable!(),
391 }
392 };
393
394 reporter
395 .end(&stage_asset)
396 .await
397 .map_err(|e| Reporter("stage".to_string(), stage_id.to_string(), e))?;
398
399 match stage_asset.state() {
400 StageState::Ok => Ok(()),
401 StageState::Fail(_) => Ok(()),
402 StageState::Err(e) => Err(Unknown(e.to_string())),
403 }
404 }
405
406 async fn stage_run_io(
407 &mut self,
408 stage_id: &str,
409 loader: &mut dyn StageLoader,
410 reporter: &mut dyn StageReporter,
411 concurrency: usize,
412 ) -> Result<(), Error> {
413 let mut load_times = 0;
414 loop {
415 let case_data_vec: Vec<(String, Value)> = loader
416 .load(concurrency)
417 .await
418 .map_err(|e| Loader("stage".to_string(), stage_id.to_string(), e))?;
419 load_times = load_times + 1;
420 if case_data_vec.len() == 0 {
421 return if load_times == 1 {
422 Err(CaseEmpty(stage_id.to_string()))
423 } else {
424 trace!("stage exhaust data {}, {}", self.id, stage_id);
425 Ok(())
426 };
427 }
428
429 trace!(
430 "stage load data {}, {}, {}",
431 self.id,
432 stage_id,
433 case_data_vec.len()
434 );
435
436 let case_asset_vec = self.case_data_vec_run(case_data_vec, concurrency).await;
437 let first_fail = case_asset_vec.iter().find(|ca| !ca.state().is_ok());
438 if first_fail.is_some() {
439 let cause_case = first_fail.unwrap();
440 let cause = match cause_case.state() {
441 CaseState::Err(_) => format!("case: {}", cause_case.id()),
442 CaseState::Fail(v) => {
443 let last_step_id = v
444 .last()
445 .map(|s| s.id().to_string())
446 .or_else(|| Some(String::new()))
447 .unwrap();
448 format!("step: {}", last_step_id)
449 }
450 CaseState::Ok(_) => String::new(),
451 };
452 self.stage_state = StageState::Fail(cause.clone());
453 self.task_state = TaskState::Fail(cause.clone());
454 }
455 reporter
456 .report(&case_asset_vec)
457 .await
458 .map_err(|e| Reporter("stage".to_string(), stage_id.to_string(), e))?;
459 }
460 }
461
462 async fn case_data_vec_run(
463 &mut self,
464 case_vec: Vec<(String, Value)>,
465 concurrency: usize,
466 ) -> Vec<Box<dyn CaseAsset>> {
467 let ca_vec = self.case_arg_vec(case_vec);
468
469 let mut case_join_result_vec = Vec::<Result<Box<dyn CaseAsset>, JoinError>>::new();
470 let mut futures = vec![];
471 for ca in ca_vec {
472 let f = case_spawn(self.app.clone(), ca);
473 futures.push(f);
474 if futures.len() >= concurrency {
475 let case_asset = join_all(futures.split_off(0)).await;
476 case_join_result_vec.extend(case_asset);
477 }
478 }
479 if !futures.is_empty() {
480 let case_asset = join_all(futures).await;
481 case_join_result_vec.extend(case_asset);
482 }
483
484 let mut case_asset_vec = Vec::with_capacity(case_join_result_vec.len());
485 for res in case_join_result_vec {
486 case_asset_vec.push(res.unwrap());
487 }
488 case_asset_vec
489 }
490
491 fn case_arg_vec<'p>(&self, data: Vec<(String, Value)>) -> Vec<CaseArgStruct> {
492 let vec = data
493 .into_iter()
494 .map(|(id, d)| {
495 CaseArgStruct::new(
496 self.flow.clone(),
497 self.step_vec.clone(),
498 d,
499 self.pre_ctx.clone(),
500 self.def_ctx.clone(),
501 self.id.clone(),
502 self.stage_id.clone(),
503 Arc::new(self.stage_round_no.to_string()),
504 id,
505 )
506 })
507 .collect();
508 return vec;
509 }
510}
511
512async fn pre_arg(
513 flow: Arc<Flow>,
514 task_id: Arc<TaskIdSimple>,
515 def_ctx: Option<Arc<Map>>,
516 pre_step_vec: Arc<TailDropVec<(String, StepRunner)>>,
517) -> Result<CaseArgStruct, Error> {
518 Ok(CaseArgStruct::new(
519 flow.clone(),
520 pre_step_vec,
521 Value::Null,
522 None,
523 def_ctx,
524 task_id.clone(),
525 Arc::new("pre".into()),
526 Arc::new("1".into()),
527 "1".into(),
528 ))
529}
530
531async fn pre_ctx_create(sa_vec: &Vec<Box<dyn StepAsset>>) -> Map {
532 let mut pre_ctx = Map::new();
533 pre_ctx.insert("step".to_owned(), Value::Object(Map::new()));
534 for sa in sa_vec.iter() {
535 if let StepState::Ok(av) = sa.state() {
536 let mut am = Map::new();
537 for a in av.iter() {
538 am.insert(a.id().to_string(), action_asset_to_value(a.as_ref()));
539 }
540 pre_ctx["step"][sa.id().step()] = Value::Object(am);
541 }
542 }
543 pre_ctx
544}
545
546
547async fn step_vec_create(
548 app: &dyn App,
549 flow: &Flow,
550 step_id_vec: Vec<String>,
551 task_id: Arc<TaskIdSimple>,
552 chord: Arc<ChordStruct>,
553) -> Result<Vec<(String, StepRunner)>, Error> {
554 let mut step_vec = vec![];
555 let case_id = Arc::new(CaseIdStruct::new(
556 task_id,
557 Arc::new("create".to_string()),
558 Arc::new("1".to_string()),
559 "pre".to_string(),
560 ));
561 for sid in step_id_vec {
562 let mut arg = ArgStruct::new(
563 app,
564 flow,
565 RenderContext::wraps(Value::Object(Map::with_capacity(0)))
566 .map_err(|e| Step(sid.clone(), Box::new(e)))?,
567 case_id.clone(),
568 sid.clone(),
569 );
570
571 let pr = StepRunner::new(chord.clone(), &mut arg)
572 .await
573 .map_err(|e| Step(sid.clone(), Box::new(e)))?;
574 step_vec.push((sid, pr));
575 }
576 Ok(step_vec)
577}
578
579async fn case_run(flow_ctx: &dyn App, case_arg: CaseArgStruct) -> Box<dyn CaseAsset> {
580 Box::new(case::run(flow_ctx, case_arg).await)
581}
582
583fn case_spawn(flow_ctx: Arc<dyn App>, case_arg: CaseArgStruct) -> JoinHandle<Box<dyn CaseAsset>> {
584 spawn(case_run_arc(flow_ctx, case_arg))
585}
586
587async fn case_run_arc(flow_ctx: Arc<dyn App>, case_arg: CaseArgStruct) -> Box<dyn CaseAsset> {
588 CTX_ID
589 .scope(
590 case_arg.id().to_string(),
591 case_run(flow_ctx.as_ref(), case_arg),
592 )
593 .await
594}