acts 0.17.2

a fast, lightweight, extensiable workflow engine
Documentation
use crate::event::EventAction;
use crate::{Act, Engine, EngineBuilder, MessageState, Vars, Workflow, utils};
use serde::Deserialize;
use serde_json::json;

#[tokio::test]
async fn engine_start() {
    let engine = Engine::new().start();
    assert!(engine.is_running());
}

#[tokio::test]
async fn engine_event_on_message() {
    let engine = Engine::new().start();
    let sig = engine.signal("".to_string());
    let s = sig.clone();
    let mid = utils::longid();
    let workflow = Workflow::new()
        .with_id(&mid)
        .with_step(|step| step.with_act(Act::irq(|act| act.with_key("test"))));

    engine.channel().on_message(move |e| {
        if e.is_type("act") {
            s.update(|data| *data = e.key.clone());
            s.close();
        }
    });

    let executor = engine.executor();
    engine.executor().model().deploy(&workflow).unwrap();

    let mut options = Vars::new();
    options.insert("pid".to_string(), json!(utils::longid()));
    executor.proc().start(&workflow.id, &options).unwrap();
    let ret = sig.recv().await;
    assert_eq!(ret, "test");
}

#[tokio::test]
async fn engine_event_on_start() {
    let engine = Engine::new().start();

    let sig = engine.signal("".to_string());
    let s = sig.clone();
    let mid = utils::longid();
    let workflow = Workflow::new()
        .with_id(&mid)
        .with_step(|step| step.with_act(Act::irq(|act| act.with_key("test"))));

    engine.channel().on_start(move |e| {
        s.send(e.model.id.clone());
    });

    let executor = engine.executor();
    engine.executor().model().deploy(&workflow).unwrap();

    let mut options = Vars::new();
    options.insert("pid".to_string(), json!(utils::longid()));
    executor.proc().start(&workflow.id, &options).unwrap();
    let ret = sig.recv().await;
    assert_eq!(ret, mid);
}

#[tokio::test]
async fn engine_event_on_complete() {
    let engine = Engine::new().start();
    let sig = engine.signal(false);
    let s1 = sig.clone();
    let mid = utils::longid();
    let workflow = Workflow::new()
        .with_id(&mid)
        .with_step(|step| step.with_id("step1"));

    engine.channel().on_complete(move |e| {
        s1.send(e.model.id == mid);
    });

    let executor = engine.executor();
    engine.executor().model().deploy(&workflow).unwrap();

    let mut options = Vars::new();
    options.insert("pid".to_string(), json!(utils::longid()));
    executor.proc().start(&workflow.id, &options).unwrap();
    let ret = sig.recv().await;
    assert!(ret);
}

#[tokio::test]
async fn engine_event_on_error() {
    let engine = Engine::new().start();
    let mid = utils::longid();
    let workflow = Workflow::new().with_id(&mid).with_step(|step| {
        step.with_id("step1")
            .with_act(Act::irq(|a| a.with_key("act1")))
    });

    let sig = engine.signal(false);
    let s1 = sig.clone();
    engine.channel().on_error(move |e| {
        s1.send(e.model.id == mid);
    });

    engine.channel().on_message(move |e| {
        let mut options = Vars::new();
        options.insert("uid".to_string(), json!("u1"));
        options.set("ecode", "err1");

        if e.is_key("act1") && e.is_state(MessageState::Created) {
            e.do_action(&e.pid, &e.tid, EventAction::Error, &options)
                .unwrap();
        }
    });

    let executor = engine.executor();
    executor.model().deploy(&workflow).unwrap();

    let mut options = Vars::new();
    options.insert("pid".to_string(), json!(utils::longid()));
    executor.proc().start(&workflow.id, &options).unwrap();
    let ret = sig.recv().await;
    assert!(ret);
}

#[tokio::test]
async fn engine_model_create() {
    let workflow = Workflow::new()
        .with_name("w1")
        .with_input("v", 0.into())
        .with_step(|step| {
            step.with_id("step1")
                .with_name("step1")
                .with_branch(|branch| {
                    branch
                        .with_if(r#"{{ v > 100 }}"#)
                        .with_step(|step| step.with_name("step3"))
                })
                .with_branch(|branch| {
                    branch
                        .with_if(r#"{{ v <= 100 }}"#)
                        .with_step(|step| step.with_name("step4"))
                })
        })
        .with_step(|step| step.with_name("step2"));

    assert_eq!(workflow.name, "w1");
    let step = workflow.step("step1").unwrap();
    assert_eq!(step.name, "step1");
    assert_eq!(step.branches.len(), 2);
}

#[tokio::test]
async fn engine_build_cache_size() {
    let engine = EngineBuilder::new()
        .cache_size(100)
        .build()
        .await
        .unwrap()
        .start();
    assert_eq!(engine.config().cache_cap(), 100)
}

#[tokio::test]
async fn engine_build_log_dir() {
    let engine = EngineBuilder::new()
        .log("test", "INFO")
        .build()
        .await
        .unwrap()
        .start();
    assert_eq!(engine.config().log().dir, "test")
}

#[tokio::test]
async fn engine_build_log_level() {
    let engine = EngineBuilder::new()
        .log("log", "DEBUG")
        .build()
        .await
        .unwrap()
        .start();
    assert_eq!(engine.config().log().level, "DEBUG")
}

#[tokio::test]
async fn engine_build_tick_interval_secs() {
    let engine = EngineBuilder::new()
        .tick_interval_secs(10)
        .build()
        .await
        .unwrap()
        .start();
    assert_eq!(engine.config().tick_interval_secs(), 10)
}

#[tokio::test]
async fn engine_build_max_message_retry_times() {
    let engine = EngineBuilder::new()
        .max_message_retry_times(100)
        .build()
        .await
        .unwrap()
        .start();
    assert_eq!(engine.config().max_message_retry_times(), 100)
}

#[tokio::test]
async fn engine_drop() {
    let engine = Engine::new().start();
    drop(engine);
    let engine = Engine::new().start();
    drop(engine)
}

#[tokio::test]
async fn engine_build_config_default() {
    if !std::path::Path::new("test").exists() {
        std::fs::create_dir("test").unwrap();
    }
    let path = "test/acts.toml";
    if std::path::Path::new(path).exists() {
        std::fs::remove_file(path).unwrap();
    }
    std::fs::write(
        path,
        r#"
        cache_cap =  100
        tick_interval_secs = 200

        [log]
        dir = "data"
        level = "INFO"
        "#,
    )
    .unwrap();
    let engine = EngineBuilder::new().build().await.unwrap();
    assert_eq!(engine.config().cache_cap(), 100);
    assert_eq!(engine.config().log().dir, "data");
    assert_eq!(engine.config().log().level, "INFO");
    assert_eq!(engine.config().tick_interval_secs(), 200);
}

#[tokio::test]
async fn engine_build_config_set_source() {
    if !std::path::Path::new("test").exists() {
        let _ = std::fs::create_dir("test");
    }
    let path = std::path::Path::new("test/test.toml");

    if path.exists() {
        std::fs::remove_file(path).unwrap();
    }
    std::fs::write(
        path,
        r#"
        cache_cap =  100
        tick_interval_secs = 200
        default_outputs = [ 
            "data"
        ]

        [log]
        dir = "data"
        level = "INFO"
        "#,
    )
    .unwrap();
    let engine = EngineBuilder::new()
        .set_config_source(path)
        .build()
        .await
        .unwrap();
    assert_eq!(engine.config().cache_cap(), 100);
    assert_eq!(engine.config().log().dir, "data");
    assert_eq!(engine.config().log().level, "INFO");
    assert_eq!(engine.config().tick_interval_secs(), 200);
}

#[tokio::test]
async fn engine_get_custom_config() {
    #[derive(Deserialize)]
    struct Custom {
        myint: i32,
        mystr: String,
        my_option: Option<i32>,
    }

    let path = "test/acts.toml";
    if std::path::Path::new(path).exists() {
        std::fs::remove_file(path).unwrap();
    }
    std::fs::write(
        path,
        r#"
        [custom]
        myint = 100
        mystr = "myData"
        "#,
    )
    .unwrap();
    let engine = EngineBuilder::new().build().await.unwrap();
    let custom = engine.config().get::<Custom>("custom").unwrap();
    assert_eq!(custom.myint, 100);
    assert_eq!(custom.mystr, "myData");
    assert_eq!(custom.my_option, None);
}