acts-next 0.15.5

a fast, tiny, extensiable workflow engine
Documentation
use crate::{
    scheduler::{tests::create_proc_signal, TaskState},
    utils::{self, consts},
    Act, StmtBuild, Vars, Workflow,
};
use serde_json::json;

#[tokio::test]
async fn sch_step_setup_each_list() {
    let mut workflow = Workflow::new().with_step(|step| {
        step.with_id("step1").with_setup(|setup| {
            setup.add(Act::each(|each| {
                each.with_in(r#"["u1", "u2"]"#).with_then(|stmts| {
                    stmts.add(Act::irq(|act| act.with_key("act1")).with_id("act1"))
                })
            }))
        })
    });

    workflow.print();
    let (proc, scher, emitter, tx, rx) = create_proc_signal::<()>(&mut workflow, &utils::longid());
    emitter.on_message(move |e| {
        println!("message: {:?}", e);
        if e.is_source("act") {
            rx.close();
        }
    });
    scher.launch(&proc);
    tx.recv().await;
    proc.print();
    let tasks = proc.task_by_nid("act1");
    assert_eq!(tasks.first().unwrap().state(), TaskState::Interrupt);
    assert!(tasks.iter().any(|t| {
        let inputs = t.inputs();
        inputs.get::<String>(consts::ACT_VALUE).unwrap() == "u1"
            && inputs.get::<i32>(consts::ACT_INDEX).unwrap() == 0
    }));
    assert!(tasks.iter().any(|t| {
        let inputs = t.inputs();
        inputs.get::<String>(consts::ACT_VALUE).unwrap() == "u2"
            && inputs.get::<i32>(consts::ACT_INDEX).unwrap() == 1
    }));
}

#[tokio::test]
async fn sch_step_setup_each_var() {
    let mut workflow = Workflow::new().with_step(|step| {
        step.with_id("step1").with_setup(|stmts| {
            stmts
                .add(Act::set(Vars::new().with("a", ["u1", "u2"])))
                .add(Act::each(|each| {
                    each.with_in(r#"$("a")"#).with_then(|stmts| {
                        stmts.add(Act::irq(|act| act.with_key("act1")).with_id("act1"))
                    })
                }))
        })
    });

    workflow.print();
    let (proc, scher, emitter, tx, rx) = create_proc_signal::<()>(&mut workflow, &utils::longid());
    emitter.on_message(move |e| {
        println!("message: {:?}", e);
        if e.is_source("act") {
            rx.close();
        }
    });
    scher.launch(&proc);
    tx.recv().await;
    proc.print();
    let tasks = proc.task_by_nid("act1");
    assert_eq!(tasks.first().unwrap().state(), TaskState::Interrupt);

    assert!(tasks.iter().any(|t| {
        let inputs = t.inputs();
        inputs.get::<String>(consts::ACT_VALUE).unwrap() == "u1"
            && inputs.get::<i32>(consts::ACT_INDEX).unwrap() == 0
    }));
    assert!(tasks.iter().any(|t| {
        let inputs = t.inputs();
        inputs.get::<String>(consts::ACT_VALUE).unwrap() == "u2"
            && inputs.get::<i32>(consts::ACT_INDEX).unwrap() == 1
    }));
}

#[tokio::test]
async fn sch_step_setup_each_var_not_exist() {
    let mut workflow = Workflow::new().with_step(|step| {
        step.with_id("step1").with_setup(|stmts| {
            stmts.add(Act::each(|each| {
                each.with_in(r#"$("not_exists")"#)
                    .with_then(|stmts| stmts.add(Act::irq(|act| act.with_key("act1"))))
            }))
        })
    });

    workflow.print();
    let (proc, scher, _, tx, _) = create_proc_signal::<()>(&mut workflow, &utils::longid());
    scher.launch(&proc);
    tx.recv().await;
    proc.print();
    assert!(proc.state().is_error());
}

#[tokio::test]
async fn sch_step_setup_each_code() {
    let mut workflow = Workflow::new().with_step(|step| {
        step.with_id("step1").with_setup(|setup| {
            setup
                .add(Act::set(Vars::new().with("a", ["u1", "u2"])))
                .add(Act::each(|each| {
                    each.with_in(
                        r#"
                        let a = $("a");
                        let b = ["u3"];
                        let c = [ "u1" ];
                        let d = [ "u3", "u4" ];

                        // result = u3
                        a.union(b).difference(c).intersection(d)
                        "#,
                    )
                    .with_then(|stmts| {
                        stmts.add(Act::irq(|act| act.with_key("act1")).with_id("act1"))
                    })
                }))
        })
    });

    workflow.print();
    let (proc, scher, emitter, tx, rx) = create_proc_signal::<()>(&mut workflow, &utils::longid());
    emitter.on_message(move |e| {
        println!("message: {:?}", e);
        if e.is_source("act") {
            rx.close();
        }
    });
    scher.launch(&proc);
    tx.recv().await;
    proc.print();
    assert_eq!(
        proc.task_by_nid("act1").first().unwrap().state(),
        TaskState::Interrupt
    );
    assert_eq!(
        proc.task_by_nid("act1")
            .first()
            .unwrap()
            .inputs()
            .get_value(consts::ACT_VALUE)
            .unwrap(),
        &json!("u3")
    );
}