use dataflow_rs::{Engine, Message, Workflow};
use serde_json::{Value, json};
use std::sync::Arc;
use std::time::Instant;
const N: usize = 500_000;
const WARMUP: usize = 50_000;
const NUM_STAGES: usize = 5;
fn stage_tasks_json(i: usize) -> String {
format!(
r#"
{{
"id": "map_{i}",
"name": "Transform {i}",
"function": {{
"name": "map",
"input": {{
"mappings": [
{{ "path": "stage{i}.id", "logic": {{ "var": "data.input.id" }} }},
{{ "path": "stage{i}.name", "logic": {{ "var": "data.input.party.name" }} }},
{{ "path": "stage{i}.amount", "logic": {{ "var": "data.input.amount.value" }} }},
{{ "path": "stage{i}.ccy", "logic": {{ "var": "data.input.amount.currency" }} }},
{{ "path": "stage{i}.adj", "logic": {{ "+": [ {{ "var": "data.input.amount.value" }}, {i} ] }} }},
{{ "path": "stage{i}.flag", "logic": {{ ">": [ {{ "var": "data.input.amount.value" }}, 0 ] }} }}
]
}}
}}
}},
{{
"id": "validate_{i}",
"name": "Validate {i}",
"function": {{
"name": "validation",
"input": {{
"rules": [
{{ "path": "stage{i}.id", "logic": {{ "!!": {{ "var": "data.stage{i}.id" }} }}, "message": "id required" }},
{{ "path": "stage{i}.amount", "logic": {{ ">": [ {{ "var": "data.stage{i}.amount" }}, 0 ] }}, "message": "amount must be positive" }}
]
}}
}}
}}"#
)
}
fn parse_task_json() -> &'static str {
r#"
{
"id": "parse",
"name": "Load payload",
"function": { "name": "parse_json", "input": { "source": "payload", "target": "input" } }
}"#
}
fn grouped_workflow() -> Workflow {
let stages: Vec<String> = (0..NUM_STAGES).map(stage_tasks_json).collect();
let json = format!(
r#"{{
"id": "wf_grouped",
"name": "Grouped",
"priority": 0,
"condition": true,
"tasks": [ {},{} ]
}}"#,
parse_task_json(),
stages.join(",")
);
Workflow::from_json(&json).unwrap()
}
fn split_workflows(conditioned: bool) -> Vec<Workflow> {
(0..NUM_STAGES)
.map(|i| {
let condition = if conditioned && i > 0 {
format!(
r#"{{ "==": [ {{ "var": "metadata.progress.workflow_id" }}, "wf_{}" ] }}"#,
i - 1
)
} else {
"true".to_string()
};
let tasks = if i == 0 {
format!("{},{}", parse_task_json(), stage_tasks_json(i))
} else {
stage_tasks_json(i)
};
let json = format!(
r#"{{
"id": "wf_{i}",
"name": "Workflow {i}",
"priority": {i},
"condition": {condition},
"tasks": [ {tasks} ]
}}"#
);
Workflow::from_json(&json).unwrap()
})
.collect()
}
fn build_payload() -> Value {
let transactions: Vec<Value> = (0..20)
.map(|k| {
json!({
"seq": k,
"ref": format!("REF-{:06}", k),
"debtor": { "name": format!("Debtor {}", k), "account": format!("DE89{:018}", k), "bic": "DEUTDEFF" },
"creditor": { "name": format!("Creditor {}", k), "account": format!("FR14{:018}", k), "bic": "BNPAFRPP" },
"amount": { "value": 1000 + k, "currency": "EUR" },
"remittance": format!("Invoice {} payment for services rendered in batch run", k)
})
})
.collect();
json!({
"id": 12345,
"party": { "name": "John Doe", "country": "DE", "id": "PARTY-001" },
"amount": { "value": 250, "currency": "EUR" },
"groupHeader": {
"msgId": "MSG-2024-0001",
"creationDateTime": "2024-06-13T10:00:00Z",
"numberOfTxns": 20
},
"transactions": transactions
})
}
async fn time_engine(label: &str, engine: &Engine, payload: &Value, expected_audit: usize) -> f64 {
{
let mut m = Message::from_value(payload);
engine.process_message(&mut m).await.unwrap();
assert_eq!(
m.audit_trail().len(),
expected_audit,
"[{label}] expected {expected_audit} audit entries, got {} — chain did not fully execute",
m.audit_trail().len()
);
}
for _ in 0..WARMUP {
let mut m = Message::from_value(payload);
engine.process_message(&mut m).await.unwrap();
}
let start = Instant::now();
for _ in 0..N {
let mut m = Message::from_value(payload);
engine.process_message(&mut m).await.unwrap();
}
let el = start.elapsed();
let ns = el.as_nanos() as f64 / N as f64;
println!(
" {label:<16} {N} msgs in {:.3}s => {ns:>8.1} ns/msg => {:.0} msg/s",
el.as_secs_f64(),
N as f64 / el.as_secs_f64()
);
ns
}
#[tokio::main(flavor = "current_thread")]
async fn main() {
let payload = build_payload();
let expected_audit = 1 + NUM_STAGES * 2;
let grouped = Arc::new(
Engine::builder()
.with_workflow(grouped_workflow())
.build()
.unwrap(),
);
let split_true = Arc::new(
Engine::builder()
.with_workflows(split_workflows(false))
.build()
.unwrap(),
);
let split_prog = Arc::new(
Engine::builder()
.with_workflows(split_workflows(true))
.build()
.unwrap(),
);
println!(
"Multi-workflow bench: {NUM_STAGES} stages (parse + {NUM_STAGES}x map+validate), heavy data.input\n"
);
let g = time_engine("grouped(1 wf)", &grouped, &payload, expected_audit).await;
let st = time_engine("split-true(N wf)", &split_true, &payload, expected_audit).await;
let sp = time_engine("split-progress(N)", &split_prog, &payload, expected_audit).await;
println!(
"\nHeadroom estimate (work is identical; only deep-walk count differs):\n \
arena hoist : split-true - grouped = {:>7.1} ns/msg ({:+.1}%)\n \
+ in-arena cond : split-progress - grouped = {:>7.1} ns/msg ({:+.1}%)",
st - g,
(st - g) / g * 100.0,
sp - g,
(sp - g) / g * 100.0,
);
}