use async_trait::async_trait;
use floxide::*;
#[derive(Clone, Debug)]
pub struct Multiply2;
#[async_trait]
impl Node for Multiply2 {
type Input = i32;
type Output = i32;
async fn process(
&self,
_ctx: &(),
input: i32,
) -> Result<Transition<Self::Output>, FloxideError> {
let out = input * 2;
println!("Multiply2: {} * 2 = {}", input, out);
Ok(Transition::Next(out))
}
}
#[derive(Clone, Debug)]
pub enum BatchAction {
Large(Vec<i32>),
Small(Vec<i32>),
}
#[derive(Clone, Debug)]
pub struct BranchAfterMultiply {
threshold: i32,
}
#[async_trait]
impl Node for BranchAfterMultiply {
type Input = Vec<i32>;
type Output = BatchAction;
async fn process(
&self,
_ctx: &(),
inputs: Vec<i32>,
) -> Result<Transition<Self::Output>, FloxideError> {
let sum: i32 = inputs.iter().sum();
if sum > self.threshold {
println!("Branch: sum {} > {} => Large", sum, self.threshold);
Ok(Transition::Next(BatchAction::Large(inputs)))
} else {
println!("Branch: sum {} <= {} => Small", sum, self.threshold);
Ok(Transition::Next(BatchAction::Small(inputs)))
}
}
}
#[derive(Clone, Debug)]
pub struct LargeNode;
#[async_trait]
impl Node for LargeNode {
type Input = Vec<i32>;
type Output = ();
async fn process(
&self,
_ctx: &(),
inputs: Vec<i32>,
) -> Result<Transition<Self::Output>, FloxideError> {
println!("LargeNode: {:?}", inputs);
Ok(Transition::Next(()))
}
}
#[derive(Clone, Debug)]
pub struct SmallNode;
#[async_trait]
impl Node for SmallNode {
type Input = Vec<i32>;
type Output = ();
async fn process(
&self,
_ctx: &(),
inputs: Vec<i32>,
) -> Result<Transition<Self::Output>, FloxideError> {
println!("SmallNode: {:?}", inputs);
Ok(Transition::Next(()))
}
}
workflow! {
pub struct BatchWorkflow {
multiply: BatchNode<(), Multiply2>,
branch: BranchAfterMultiply,
large: LargeNode,
small: SmallNode,
}
start = multiply;
context = ();
edges {
multiply => { [ branch ] };
branch => {
BatchAction::Large(v) => [ large ];
BatchAction::Small(v) => [ small ];
};
large => {};
small => {};
}
}
pub async fn run_batch_example() -> Result<(), Box<dyn std::error::Error>> {
let wf = BatchWorkflow {
multiply: BatchNode::new(Multiply2, 2),
branch: BranchAfterMultiply { threshold: 20 },
large: LargeNode,
small: SmallNode,
};
let ctx = WorkflowCtx::new(());
let inputs = vec![1, 2, 3, 4, 5];
println!("Running batch workflow on {:?}", inputs);
wf.run(&ctx, inputs).await?;
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();
run_batch_example().await
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_batch_example() {
run_batch_example()
.await
.expect("batch workflow should run");
}
}