chord_flow/flow/step/
mod.rs

1use std::error::Error as StdError;
2use std::sync::Arc;
3
4use chrono::{DateTime, Utc};
5use log::{debug, error, info, trace, warn};
6
7use chord_core::action::{Action, Asset, Chord};
8use chord_core::collection::TailDropVec;
9use chord_core::step::{ActionAsset, ActionState, StepId};
10use chord_core::value::Value;
11use Error::*;
12use res::StepAssetStruct;
13
14use crate::flow::step::arg::{ArgStruct, ChordStruct};
15use crate::flow::step::res::ActionAssetStruct;
16
17pub mod arg;
18pub mod res;
19
20#[derive(thiserror::Error, Debug)]
21pub enum Error {
22    #[error("unsupported action `{0}`")]
23    Unsupported(String),
24
25    #[error("action `{0}.{1}` create:\n{1}")]
26    Create(String, String, Box<dyn StdError + Sync + Send>),
27}
28
29pub struct StepRunner {
30    chord: Arc<ChordStruct>,
31    action_vec: Arc<TailDropVec<(String, Box<dyn Action>)>>,
32}
33
34impl StepRunner {
35    pub async fn new(
36        chord: Arc<ChordStruct>,
37        arg: &mut ArgStruct<'_, '_>,
38    ) -> Result<StepRunner, Error> {
39        trace!("step new {}", arg.step_id());
40        let obj = arg.flow().step_obj(arg.step_id().step());
41        let aid_vec: Vec<String> = obj.iter().map(|(aid, _)| aid.to_string()).collect();
42        let mut action_vec = Vec::with_capacity(obj.len());
43
44        for aid in aid_vec {
45            arg.aid(aid.as_str());
46            let func = arg.flow().step_action_func(arg.step_id().step(), aid.as_str());
47            let action = chord
48                .creator(func.into())
49                .ok_or_else(|| Unsupported(func.into()))?
50                .create(chord.as_ref(), arg)
51                .await
52                .map_err(|e| Create(arg.step_id().step().to_string(), aid.to_string(), e))?;
53            action_vec.push((aid.to_string(), action));
54        }
55
56        Ok(StepRunner {
57            chord,
58            action_vec: Arc::new(TailDropVec::from(action_vec)),
59        })
60    }
61
62    pub async fn run(&self, arg: &mut ArgStruct<'_, '_>) -> StepAssetStruct {
63        trace!("step run {}", arg.step_id());
64        let start = Utc::now();
65        let mut asset_vec = Vec::with_capacity(self.action_vec.len());
66        let mut success = true;
67        for (aid, action) in self.action_vec.iter() {
68            let key: &str = aid;
69            let action: &Box<dyn Action> = action;
70            arg.aid(key);
71            let explain = action
72                .explain(self.chord.as_ref(), arg)
73                .await
74                .unwrap_or(Value::Null);
75            let start = Utc::now();
76            let value = action.execute(self.chord.as_ref(), arg).await;
77            let end = Utc::now();
78            match value {
79                Ok(_) => {
80                    let asset = action_asset(aid, start, end, explain, value);
81                    if let ActionState::Ok(v) = asset.state() {
82                        arg.context_mut()
83                            .data_mut()
84                            .insert(asset.id().to_string(), v.to_value());
85                    }
86                    asset_vec.push(asset);
87                }
88
89                Err(_) => {
90                    let asset = action_asset(aid, start, end, explain, value);
91                    asset_vec.push(asset);
92                    success = false;
93                    break;
94                }
95            }
96        }
97
98        if success {
99            for ass in asset_vec.iter() {
100                if let ActionState::Ok(v) = ass.state() {
101                    debug!(
102                        "{}:\n{}\n>>>\n{}",
103                        ass.id(),
104                        explain_string(ass.explain()),
105                        v.to_value()
106                    );
107                }
108            }
109            info!("step Ok   {}", arg.step_id());
110        } else {
111            for ass in asset_vec.iter() {
112                if let ActionState::Ok(v) = ass.state() {
113                    warn!(
114                        "{}:\n{}\n>>>\n{}",
115                        ass.id(),
116                        explain_string(ass.explain()),
117                        v.to_value()
118                    );
119                } else if let ActionState::Err(e) = ass.state() {
120                    error!(
121                        "{}:\n{}\n>>>\n{}",
122                        ass.id(),
123                        explain_string(ass.explain()),
124                        e
125                    );
126                }
127            }
128            error!("step Err {}", arg.step_id());
129        }
130
131        StepAssetStruct::new(Clone::clone(arg.step_id()), start, Utc::now(), asset_vec)
132    }
133}
134
135fn action_asset(
136    aid: &str,
137    start: DateTime<Utc>,
138    end: DateTime<Utc>,
139    explain: Value,
140    value: Result<Asset, chord_core::action::Error>,
141) -> ActionAssetStruct {
142    match value {
143        Ok(a) => {
144            ActionAssetStruct::new(aid.to_string(),
145                                   start,
146                                   end,
147                                   explain,
148                                   ActionState::Ok(a),
149            )
150        }
151        Err(e) => {
152            ActionAssetStruct::new(aid.to_string(),
153                                   start,
154                                   end,
155                                   explain,
156                                   ActionState::Err(e))
157        }
158    }
159}
160
161fn explain_string(exp: &Value) -> String {
162    if let Value::String(txt) = exp {
163        txt.to_string()
164    } else {
165        exp.to_string()
166    }
167}
168
169pub fn action_asset_to_value(action_asset: &dyn ActionAsset) -> Value {
170    match action_asset.state() {
171        ActionState::Ok(v) => {
172            v.to_value()
173        }
174        ActionState::Err(e) => {
175            Value::String(e.to_string())
176        }
177    }
178}