use dataflow_rs::prelude::*;
use serde_json::json;
#[tokio::main]
async fn main() -> Result<()> {
let workflow = Workflow::from_json(
r#"{
"id": "demo",
"name": "Error-handling demo",
"tasks": [
{
"id": "load",
"name": "Load payload",
"function": {
"name": "parse_json",
"input": { "source": "payload", "target": "input" }
}
},
{
"id": "validate_optional",
"name": "Optional validation (allowed to fail)",
"continue_on_error": true,
"function": {
"name": "validation",
"input": {
"rules": [
{
"logic": { "!!": {"var": "data.input.email"} },
"message": "email is required"
}
]
}
}
},
{
"id": "greet",
"name": "Build greeting",
"function": {
"name": "map",
"input": {
"mappings": [
{
"path": "data.greeting",
"logic": { "cat": ["Hello, ", {"var": "data.input.name"}, "!"] }
}
]
}
}
}
]
}"#,
)?;
let engine = Engine::builder().with_workflow(workflow).build()?;
let mut message = Message::from_value(&json!({"name": "World"}));
match engine.process_message(&mut message).await {
Ok(()) => println!("engine: workflow ran to completion"),
Err(e) => println!("engine: workflow halted early: {e}"),
}
if message.has_errors() {
println!("recorded errors:");
for err in message.errors() {
println!(
" [{workflow}/{task}] {msg}",
workflow = err.workflow_id.as_deref().unwrap_or("-"),
task = err.task_id.as_deref().unwrap_or("-"),
msg = err.message,
);
}
} else {
println!("no errors recorded");
}
println!("greeting: {}", message.data()["greeting"]);
Ok(())
}