acts-next 0.14.4

a fast, tiny, extensiable workflow engine
Documentation
use crate::{
    scheduler::tests::create_proc_signal2,
    utils::{self, consts},
    Act, Event, Message, MessageState, Signal, StmtBuild, Workflow,
};
use serde_json::json;

#[tokio::test]
async fn sch_step_run_msg() {
    let workflow = Workflow::new().with_step(|step| {
        step.with_id("step1")
            .with_run(r#"act.msg({ key: "msg1" })"#)
    });
    let ret = run_test(&workflow, |e, s| {
        if e.is_key("msg1") {
            s.send(true);
        }
    })
    .await;
    assert!(ret);
}

#[tokio::test]
async fn sch_step_run_req() {
    let workflow = Workflow::new().with_step(|step| {
        step.with_id("step1")
            .with_run(r#"act.irq({ key: "act1" })"#)
    });
    let ret = run_test(&workflow, |e, s| {
        if e.is_key("act1") {
            s.send(true);
        }
    })
    .await;
    assert!(ret);
}

#[tokio::test]
async fn sch_step_run_chain_array() {
    let workflow = Workflow::new().with_step(|step| {
        step.with_id("step1")
            .with_run(r#"act.chain({ in: '["u1"]', then: [ { act: "msg", key: "msg1" }]})"#)
    });
    let ret = run_test(&workflow, |e, s| {
        if e.is_key("msg1") {
            s.send(e.inputs.get::<String>(consts::ACT_VALUE).unwrap() == "u1");
        }
    })
    .await;
    assert!(ret);
}

#[tokio::test]
async fn sch_step_run_chain_var() {
    let workflow = Workflow::new()
        .with_input("a", json!(r#"["u1"]"#))
        .with_step(|step| {
            step.with_id("step1")
                .with_run(r#"act.chain({ in: $("a"), then: [ { act: "msg", key: "msg1" } ] })"#)
        });
    let ret = run_test(&workflow, |e, s| {
        if e.is_key("msg1") {
            s.send(e.inputs.get::<String>(consts::ACT_VALUE).unwrap() == "u1");
        }
    })
    .await;
    assert!(ret);
}

#[tokio::test]
async fn sch_step_run_each_array() {
    let workflow = Workflow::new().with_step(|step| {
        step.with_id("step1")
            .with_run(r#"act.each({ in: '["u1"]', then: [ { act: "msg", key: "msg1" } ] })"#)
    });

    let ret = run_test(&workflow, |e, s| {
        if e.is_key("msg1") {
            s.send(e.inputs.get::<String>(consts::ACT_VALUE).unwrap() == "u1");
        }
    })
    .await;
    assert!(ret);
}

#[tokio::test]
async fn sch_step_run_each_var() {
    let workflow = Workflow::new()
        .with_input("a", json!(r#"["u1"]"#))
        .with_step(|step| {
            step.with_id("step1")
                .with_run(r#"act.each({ in: $("a"), then: [ { act: "msg", key: "msg1" } ] })"#)
        });
    let ret = run_test(&workflow, |e, s| {
        if e.is_key("msg1") {
            s.send(e.inputs.get::<String>(consts::ACT_VALUE).unwrap() == "u1");
        }
    })
    .await;
    assert!(ret);
}

#[tokio::test]
async fn sch_step_run_block() {
    let workflow = Workflow::new().with_step(|step| {
        step.with_id("step1")
            .with_run(r#"act.block({ then: [{ act: "msg", key: "msg1" }] })"#)
    });

    let ret = run_test(&workflow, |e, s| {
        if e.is_key("msg1") {
            s.send(true);
        }
    })
    .await;
    assert!(ret);
}

#[tokio::test]
async fn sch_step_run_call() {
    let workflow = Workflow::new()
        .with_step(|step| step.with_id("step1").with_run(r#"act.call({ key: "m1" })"#));
    let dep = Workflow::new().with_id("m1");
    let ret = run_test_dep(&workflow, &dep, |e, s| {
        if e.is_key("m1") {
            s.send(true);
        }
    })
    .await;
    assert!(ret);
}

#[tokio::test]
async fn sch_step_run_push() {
    let workflow = Workflow::new().with_step(|step| {
        step.with_id("step1")
            .with_run(r#"act.push({ act: "irq", key: "act1" })"#)
    });
    let ret = run_test(&workflow, |e, s| {
        if e.is_key("act1") {
            s.send(true);
        }
    })
    .await;
    assert!(ret);
}

#[tokio::test]
async fn sch_step_run_expose() {
    let workflow = Workflow::new()
        .with_step(|step| step.with_id("step1").with_run(r#" act.expose("a", 100);"#));
    let ret = run_test(&workflow, |e, s| {
        if e.is_key("step1") && e.is_state(MessageState::Completed) {
            s.send(e.outputs.get::<i32>("a").unwrap() == 100);
        }
    })
    .await;
    assert!(ret);
}

#[tokio::test]
async fn sch_step_run_abort() {
    let workflow =
        Workflow::new().with_step(|step| step.with_id("step1").with_run(r#"act.abort();"#));
    let ret: bool = run_test(&workflow, |e, s| {
        if e.is_key("step1") && e.is_state(MessageState::Aborted) {
            s.send(true);
        }
    })
    .await;
    assert!(ret);
}

#[tokio::test]
async fn sch_step_run_fail() {
    let workflow = Workflow::new().with_step(|step| {
        step.with_id("step1")
            .with_run(r#"act.fail("err1", "error message");"#)
    });
    let ret: bool = run_test(&workflow, |e, s| {
        if e.is_key("step1") && e.is_state(MessageState::Error) {
            s.send(true);
        }
    })
    .await;
    assert!(ret);
}

#[tokio::test]
async fn sch_step_run_skip() {
    let workflow =
        Workflow::new().with_step(|step| step.with_id("step1").with_run(r#"act.skip();"#));

    let ret: bool = run_test(&workflow, |e, s| {
        if e.is_key("step1") && e.is_state(MessageState::Skipped) {
            s.send(true);
        }
    })
    .await;
    assert!(ret);
}

#[tokio::test]
async fn sch_step_run_back() {
    let workflow = Workflow::new()
        .with_step(|step| step.with_id("step1"))
        .with_step(|step| step.with_id("step2").with_run(r#"act.back("step1");"#));
    let ret: bool = run_test(&workflow, |e, s| {
        if e.is_key("step2") && e.is_state(MessageState::Backed) {
            s.send(true);
        }
    })
    .await;
    assert!(ret);
}

#[tokio::test]
async fn sch_step_run_state() {
    let workflow = Workflow::new().with_step(|step| {
        step.with_id("step1").with_run(
            r#"
    let state = act.state();
    act.expose("state", state);
    "#,
        )
    });
    let ret = run_test(&workflow, |e, s| {
        if e.is_key("step1") && e.is_state(MessageState::Completed) {
            s.send(e.outputs.get::<String>("state").unwrap() == "running");
        }
    })
    .await;
    assert!(ret);
}

#[tokio::test]
async fn sch_step_run_set_value() {
    let workflow = Workflow::new().with_step(|step| {
        step.with_id("step1")
            .with_output("my_data", json!(null))
            .with_run(r#"act.set("my_data", "abc");"#)
    });
    let ret = run_test(&workflow, |e, s| {
        if e.is_key("step1") && e.is_state(MessageState::Completed) {
            s.send(e.outputs.get::<String>("my_data").unwrap() == "abc");
        }
    })
    .await;
    assert!(ret);
}

#[tokio::test]
async fn sch_step_run_throw_error() {
    let workflow = Workflow::new().with_step(|step| {
        step.with_id("step1")
            .with_output("my_data", json!(null))
            .with_run(r#" throw new Error("test error");"#)
    });
    let ret = run_test(&workflow, |e, s| {
        if e.is_key("step1") && e.is_state(MessageState::Error) {
            s.send(true);
        }
    })
    .await;
    assert!(ret);
}

#[tokio::test]
async fn sch_step_run_catch_error() {
    let workflow = Workflow::new().with_step(|step| {
        step.with_id("step1")
            .with_output("my_data", json!(null))
            .with_run(r#" throw new Error("test error");"#)
            .with_catch(|c| c.with_then(|stmts| stmts.add(Act::msg(|act| act.with_key("msg1")))))
    });
    let ret = run_test(&workflow, |e, s| {
        if e.is_key("step1") && e.is_state(MessageState::Completed) {
            s.send(true);
        }
    })
    .await;
    assert!(ret);
}

async fn run_test<T: Clone + Send + 'static + Default>(
    workflow: &Workflow,
    exit_if: fn(&Event<Message>, sig: Signal<T>),
) -> T {
    let (engine, proc, tx, rx) = create_proc_signal2::<T>(workflow, &utils::longid());
    let s = rx.clone();
    engine.channel().on_message(move |e| {
        println!("message: {:?}", e);
        exit_if(e, rx.clone());
    });

    engine.channel().on_error(move |_| {
        s.close();
    });
    engine.runtime().launch(&proc);
    tx.recv().await
}

async fn run_test_dep<T: Clone + Send + 'static + Default>(
    workflow: &Workflow,
    dep: &Workflow,
    exit_if: fn(&Event<Message>, sig: Signal<T>),
) -> T {
    let (engine, proc, tx, rx) = create_proc_signal2::<T>(workflow, &utils::longid());
    engine.executor().model().deploy(dep).unwrap();
    engine.channel().on_message(move |e| {
        println!("message: {:?}", e);
        exit_if(e, rx.clone());
    });
    engine.runtime().launch(&proc);
    tx.recv().await
}