use std::time::Instant;
use noetl_tools::tools::TaskSequenceTool;
use noetl_tools::{ExecutionContext, Tool, ToolConfig};
fn seed_context() -> ExecutionContext {
let mut ctx = ExecutionContext::new(987654321, "process_pft_action_batches", "http://server");
ctx.worker_id = Some("worker-bench-1".to_string());
ctx.command_id = Some("cmd-bench-1".to_string());
ctx.set_variable(
"workload",
serde_json::json!({
"pft_batch_size": 25,
"pft_batch_concurrency": 16,
"data_type": "mds",
"base_url": "http://paginated-api/api/v1/pft/batch",
"page_size": 10,
}),
);
ctx.set_variable("iter", serde_json::json!({"i": 0}));
for s in 0..8 {
let rows: Vec<serde_json::Value> = (0..40)
.map(|i| {
serde_json::json!({
"patient_id": format!("patient_{s}_{i:04}"),
"facility": format!("facility_{}", s % 10),
"mds": {"a01": i, "b02": "value", "c03": [1, 2, 3, 4]},
"detail": "lorem ipsum dolor sit amet consectetur adipiscing",
})
})
.collect();
ctx.set_variable(
format!("prior_batch_{s}"),
serde_json::json!({"rows": rows, "count": 40, "data_type": "mds"}),
);
}
ctx
}
const DRAIN_ITERS: u64 = 300;
fn drain_config() -> ToolConfig {
let when = format!("{{{{ iter.i < {DRAIN_ITERS} }}}}");
let tasks = serde_json::json!([
{
"drain_step": {
"kind": "noop",
"input": {
"url": "{{ workload.base_url }}/{{ workload.data_type }}",
"batch_size": "{{ workload.pft_batch_size }}",
"page_size": "{{ workload.page_size }}"
},
"result": {
"data": {"records": [{"patient_id": "p1"}, {"patient_id": "p2"}]},
"batch_id": "b-001"
},
"set": {
"iter.fetched": "{{ output.data.records }}",
"iter.batch_id": "{{ output.batch_id }}"
},
"spec": {
"policy": {
"rules": [
{
"when": when,
"then": {
"do": "jump",
"to": "drain_step",
"set": {"iter.i": "{{ iter.i + 1 }}"}
}
},
{"else": {"then": {"do": "break"}}}
]
}
}
}
}
]);
ToolConfig {
kind: "task_sequence".to_string(),
config: tasks,
timeout: None,
retry: None,
auth: None,
}
}
fn main() {
let runs: u64 = std::env::var("BENCH_RUNS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(400);
let subtasks_per_run = DRAIN_ITERS + 1;
let tool = TaskSequenceTool::new();
let ctx = seed_context();
let config = drain_config();
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.expect("runtime");
for _ in 0..20 {
let _ = rt.block_on(tool.execute(&config, &ctx)).expect("warmup");
}
let start = Instant::now();
let acc = std::hint::black_box(rt.block_on(async {
let mut acc = 0u64;
for _ in 0..runs {
let r = tool.execute(&config, &ctx).await.expect("execute");
acc += r.is_success() as u64;
}
acc
}));
let elapsed = start.elapsed();
assert_eq!(acc, runs, "every drain must succeed");
let subtasks = runs * subtasks_per_run;
let per_drain_us = elapsed.as_secs_f64() * 1e6 / runs as f64;
let per_subtask_us = elapsed.as_secs_f64() * 1e6 / subtasks as f64;
println!("runs={runs} subtasks={subtasks} total={:.3}s", elapsed.as_secs_f64());
println!("per_drain={per_drain_us:.2}us per_subtask={per_subtask_us:.2}us");
println!(
"drains_per_sec={:.0} subtasks_per_sec={:.0}",
runs as f64 / elapsed.as_secs_f64(),
subtasks as f64 / elapsed.as_secs_f64()
);
}