chord-flow 0.1.21

async parallel case executor
Documentation
use std::error::Error as StdError;
use std::sync::Arc;

use chrono::{DateTime, Utc};
use log::{debug, error, info, trace, warn};

use chord_core::action::{Action, Asset, Chord};
use chord_core::collection::TailDropVec;
use chord_core::step::{ActionAsset, ActionState, StepId};
use chord_core::value::Value;
use Error::*;
use res::StepAssetStruct;

use crate::flow::step::arg::{ArgStruct, ChordStruct};
use crate::flow::step::res::ActionAssetStruct;

pub mod arg;
pub mod res;

#[derive(thiserror::Error, Debug)]
pub enum Error {
    #[error("unsupported action `{0}`")]
    Unsupported(String),

    #[error("action `{0}.{1}` create:\n{1}")]
    Create(String, String, Box<dyn StdError + Sync + Send>),
}

pub struct StepRunner {
    chord: Arc<ChordStruct>,
    action_vec: Arc<TailDropVec<(String, Box<dyn Action>)>>,
}

impl StepRunner {
    pub async fn new(
        chord: Arc<ChordStruct>,
        arg: &mut ArgStruct<'_, '_>,
    ) -> Result<StepRunner, Error> {
        trace!("step new {}", arg.step_id());
        let obj = arg.flow().step_obj(arg.step_id().step());
        let aid_vec: Vec<String> = obj.iter().map(|(aid, _)| aid.to_string()).collect();
        let mut action_vec = Vec::with_capacity(obj.len());

        for aid in aid_vec {
            arg.aid(aid.as_str());
            let func = arg.flow().step_action_func(arg.step_id().step(), aid.as_str());
            let action = chord
                .creator(func.into())
                .ok_or_else(|| Unsupported(func.into()))?
                .create(chord.as_ref(), arg)
                .await
                .map_err(|e| Create(arg.step_id().step().to_string(), aid.to_string(), e))?;
            action_vec.push((aid.to_string(), action));
        }

        Ok(StepRunner {
            chord,
            action_vec: Arc::new(TailDropVec::from(action_vec)),
        })
    }

    pub async fn run(&self, arg: &mut ArgStruct<'_, '_>) -> StepAssetStruct {
        trace!("step run {}", arg.step_id());
        let start = Utc::now();
        let mut asset_vec = Vec::with_capacity(self.action_vec.len());
        let mut success = true;
        for (aid, action) in self.action_vec.iter() {
            let key: &str = aid;
            let action: &Box<dyn Action> = action;
            arg.aid(key);
            let explain = action
                .explain(self.chord.as_ref(), arg)
                .await
                .unwrap_or(Value::Null);
            let start = Utc::now();
            let value = action.execute(self.chord.as_ref(), arg).await;
            let end = Utc::now();
            match value {
                Ok(_) => {
                    let asset = action_asset(aid, start, end, explain, value);
                    if let ActionState::Ok(v) = asset.state() {
                        arg.context_mut()
                            .data_mut()
                            .insert(asset.id().to_string(), v.to_value());
                    }
                    asset_vec.push(asset);
                }

                Err(_) => {
                    let asset = action_asset(aid, start, end, explain, value);
                    asset_vec.push(asset);
                    success = false;
                    break;
                }
            }
        }

        if success {
            for ass in asset_vec.iter() {
                if let ActionState::Ok(v) = ass.state() {
                    debug!(
                        "{}:\n{}\n>>>\n{}",
                        ass.id(),
                        explain_string(ass.explain()),
                        v.to_value()
                    );
                }
            }
            info!("step Ok   {}", arg.step_id());
        } else {
            for ass in asset_vec.iter() {
                if let ActionState::Ok(v) = ass.state() {
                    warn!(
                        "{}:\n{}\n>>>\n{}",
                        ass.id(),
                        explain_string(ass.explain()),
                        v.to_value()
                    );
                } else if let ActionState::Err(e) = ass.state() {
                    error!(
                        "{}:\n{}\n>>>\n{}",
                        ass.id(),
                        explain_string(ass.explain()),
                        e
                    );
                }
            }
            error!("step Err {}", arg.step_id());
        }

        StepAssetStruct::new(Clone::clone(arg.step_id()), start, Utc::now(), asset_vec)
    }
}

fn action_asset(
    aid: &str,
    start: DateTime<Utc>,
    end: DateTime<Utc>,
    explain: Value,
    value: Result<Asset, chord_core::action::Error>,
) -> ActionAssetStruct {
    match value {
        Ok(a) => {
            ActionAssetStruct::new(aid.to_string(),
                                   start,
                                   end,
                                   explain,
                                   ActionState::Ok(a),
            )
        }
        Err(e) => {
            ActionAssetStruct::new(aid.to_string(),
                                   start,
                                   end,
                                   explain,
                                   ActionState::Err(e))
        }
    }
}

fn explain_string(exp: &Value) -> String {
    if let Value::String(txt) = exp {
        txt.to_string()
    } else {
        exp.to_string()
    }
}

pub fn action_asset_to_value(action_asset: &dyn ActionAsset) -> Value {
    match action_asset.state() {
        ActionState::Ok(v) => {
            v.to_value()
        }
        ActionState::Err(e) => {
            Value::String(e.to_string())
        }
    }
}