use std::time::Duration;
use operonx::{Interrupt, Operon};
use serde_json::{json, Map, Value};
fn b1b_graph_json() -> String {
json!({
"schema_version": "1.0",
"type": "graph",
"name": "main",
"full_name": "main",
"entries": ["gen", "kicker"],
"exits": ["slow", "kicker"],
"initial_ready_count": {"gen": 0, "kicker": 0, "slow": 1},
"compiled_adj": {
"gen": [["slow", false]],
"slow": [],
"kicker": []
},
"inputs": {"seed": {"required": true}},
"outputs": {"out": {}},
"ops": {
"gen": {
"type": "code",
"name": "gen",
"full_name": "main.gen",
"func_name": "gen3",
"is_generator": true,
"bound": "sync",
"inputs": {
"_signal": {
"required": true,
"ref": {"source": "__PARENT__", "var": "seed"}
}
},
"outputs": {"i": {}}
},
"slow": {
"type": "code",
"name": "slow",
"full_name": "main.slow",
"func_name": "slow_double",
"is_async": true,
"bound": "io",
"inputs": {
"i": {
"required": true,
"ref": {"source": "main.gen", "var": "i"}
}
},
"outputs": {
"out": {
"ref": {
"source": "__PARENT__",
"var": "out",
"is_output": true
}
}
}
},
"kicker": {
"type": "code",
"name": "kicker",
"full_name": "main.kicker",
"func_name": "kicker_async",
"is_async": true,
"bound": "io",
"inputs": {
"_signal": {
"required": true,
"ref": {"source": "__PARENT__", "var": "seed"}
}
},
"outputs": {}
}
}
})
.to_string()
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn b1b_siblings_dispatch_after_active_item_cancel() {
let graph = b1b_graph_json();
let engine = Operon::builder(&graph)
.no_resources()
.install_global_hub(false)
.load_dotenv(false)
.op("gen3", |_inputs: Map<String, Value>| {
Ok(json!([{"i": 0}, {"i": 1}, {"i": 2}]))
})
.op_async("slow_double", |inputs: Map<String, Value>| async move {
tokio::time::sleep(Duration::from_millis(200)).await;
let i = inputs.get("i").and_then(|v| v.as_i64()).unwrap_or(-1);
Ok(json!({"out": i}))
})
.op_async("kicker_async", |_inputs: Map<String, Value>| async move {
tokio::time::sleep(Duration::from_millis(50)).await;
let irq = Interrupt::new(vec!["main".to_string(), "yield_0".to_string()], "b1b");
Ok(irq.into())
})
.build()
.expect("build engine");
let mut inputs = Map::new();
inputs.insert("seed".into(), json!("x"));
let out = tokio::time::timeout(
Duration::from_millis(1500),
engine.run_json_async(inputs, None, None, None),
)
.await
.expect("scheduler should not stall after Interrupt cancel")
.expect("run_json_async failed");
let produced = out["out"].as_array().cloned().unwrap_or_default();
let mut js: Vec<i64> = produced.iter().filter_map(|v| v.as_i64()).collect();
js.sort();
assert_eq!(
js,
vec![1, 2],
"sequential edge stalled after Interrupt: got {js:?}, expected [1, 2] \
(item 0 cancelled, items 1 & 2 advance)"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn b1b_descendant_queued_items_dropped_on_subtree_sweep() {
let graph = b1b_graph_json();
let engine = Operon::builder(&graph)
.no_resources()
.install_global_hub(false)
.load_dotenv(false)
.op("gen3", |_inputs: Map<String, Value>| {
Ok(json!([{"i": 0}, {"i": 1}, {"i": 2}]))
})
.op_async("slow_double", |inputs: Map<String, Value>| async move {
tokio::time::sleep(Duration::from_millis(200)).await;
let i = inputs.get("i").and_then(|v| v.as_i64()).unwrap_or(-1);
Ok(json!({"out": i}))
})
.op_async("kicker_async", |_inputs: Map<String, Value>| async move {
tokio::time::sleep(Duration::from_millis(50)).await;
let irq = Interrupt::new(vec!["main".to_string()], "b1c");
Ok(irq.into())
})
.build()
.expect("build engine");
let mut inputs = Map::new();
inputs.insert("seed".into(), json!("x"));
let out = tokio::time::timeout(
Duration::from_millis(1000),
engine.run_json_async(inputs, None, None, None),
)
.await
.expect("scheduler should not hang after subtree sweep")
.expect("run_json_async failed");
let produced = out
.get("out")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
assert!(
produced.is_empty(),
"subtree sweep should have dropped all queued items; got {produced:?}"
);
}