chord_flow/flow/step/
mod.rs1use std::error::Error as StdError;
2use std::sync::Arc;
3
4use chrono::{DateTime, Utc};
5use log::{debug, error, info, trace, warn};
6
7use chord_core::action::{Action, Asset, Chord};
8use chord_core::collection::TailDropVec;
9use chord_core::step::{ActionAsset, ActionState, StepId};
10use chord_core::value::Value;
11use Error::*;
12use res::StepAssetStruct;
13
14use crate::flow::step::arg::{ArgStruct, ChordStruct};
15use crate::flow::step::res::ActionAssetStruct;
16
17pub mod arg;
18pub mod res;
19
20#[derive(thiserror::Error, Debug)]
21pub enum Error {
22 #[error("unsupported action `{0}`")]
23 Unsupported(String),
24
25 #[error("action `{0}.{1}` create:\n{1}")]
26 Create(String, String, Box<dyn StdError + Sync + Send>),
27}
28
29pub struct StepRunner {
30 chord: Arc<ChordStruct>,
31 action_vec: Arc<TailDropVec<(String, Box<dyn Action>)>>,
32}
33
34impl StepRunner {
35 pub async fn new(
36 chord: Arc<ChordStruct>,
37 arg: &mut ArgStruct<'_, '_>,
38 ) -> Result<StepRunner, Error> {
39 trace!("step new {}", arg.step_id());
40 let obj = arg.flow().step_obj(arg.step_id().step());
41 let aid_vec: Vec<String> = obj.iter().map(|(aid, _)| aid.to_string()).collect();
42 let mut action_vec = Vec::with_capacity(obj.len());
43
44 for aid in aid_vec {
45 arg.aid(aid.as_str());
46 let func = arg.flow().step_action_func(arg.step_id().step(), aid.as_str());
47 let action = chord
48 .creator(func.into())
49 .ok_or_else(|| Unsupported(func.into()))?
50 .create(chord.as_ref(), arg)
51 .await
52 .map_err(|e| Create(arg.step_id().step().to_string(), aid.to_string(), e))?;
53 action_vec.push((aid.to_string(), action));
54 }
55
56 Ok(StepRunner {
57 chord,
58 action_vec: Arc::new(TailDropVec::from(action_vec)),
59 })
60 }
61
62 pub async fn run(&self, arg: &mut ArgStruct<'_, '_>) -> StepAssetStruct {
63 trace!("step run {}", arg.step_id());
64 let start = Utc::now();
65 let mut asset_vec = Vec::with_capacity(self.action_vec.len());
66 let mut success = true;
67 for (aid, action) in self.action_vec.iter() {
68 let key: &str = aid;
69 let action: &Box<dyn Action> = action;
70 arg.aid(key);
71 let explain = action
72 .explain(self.chord.as_ref(), arg)
73 .await
74 .unwrap_or(Value::Null);
75 let start = Utc::now();
76 let value = action.execute(self.chord.as_ref(), arg).await;
77 let end = Utc::now();
78 match value {
79 Ok(_) => {
80 let asset = action_asset(aid, start, end, explain, value);
81 if let ActionState::Ok(v) = asset.state() {
82 arg.context_mut()
83 .data_mut()
84 .insert(asset.id().to_string(), v.to_value());
85 }
86 asset_vec.push(asset);
87 }
88
89 Err(_) => {
90 let asset = action_asset(aid, start, end, explain, value);
91 asset_vec.push(asset);
92 success = false;
93 break;
94 }
95 }
96 }
97
98 if success {
99 for ass in asset_vec.iter() {
100 if let ActionState::Ok(v) = ass.state() {
101 debug!(
102 "{}:\n{}\n>>>\n{}",
103 ass.id(),
104 explain_string(ass.explain()),
105 v.to_value()
106 );
107 }
108 }
109 info!("step Ok {}", arg.step_id());
110 } else {
111 for ass in asset_vec.iter() {
112 if let ActionState::Ok(v) = ass.state() {
113 warn!(
114 "{}:\n{}\n>>>\n{}",
115 ass.id(),
116 explain_string(ass.explain()),
117 v.to_value()
118 );
119 } else if let ActionState::Err(e) = ass.state() {
120 error!(
121 "{}:\n{}\n>>>\n{}",
122 ass.id(),
123 explain_string(ass.explain()),
124 e
125 );
126 }
127 }
128 error!("step Err {}", arg.step_id());
129 }
130
131 StepAssetStruct::new(Clone::clone(arg.step_id()), start, Utc::now(), asset_vec)
132 }
133}
134
135fn action_asset(
136 aid: &str,
137 start: DateTime<Utc>,
138 end: DateTime<Utc>,
139 explain: Value,
140 value: Result<Asset, chord_core::action::Error>,
141) -> ActionAssetStruct {
142 match value {
143 Ok(a) => {
144 ActionAssetStruct::new(aid.to_string(),
145 start,
146 end,
147 explain,
148 ActionState::Ok(a),
149 )
150 }
151 Err(e) => {
152 ActionAssetStruct::new(aid.to_string(),
153 start,
154 end,
155 explain,
156 ActionState::Err(e))
157 }
158 }
159}
160
161fn explain_string(exp: &Value) -> String {
162 if let Value::String(txt) = exp {
163 txt.to_string()
164 } else {
165 exp.to_string()
166 }
167}
168
169pub fn action_asset_to_value(action_asset: &dyn ActionAsset) -> Value {
170 match action_asset.state() {
171 ActionState::Ok(v) => {
172 v.to_value()
173 }
174 ActionState::Err(e) => {
175 Value::String(e.to_string())
176 }
177 }
178}