#![cfg(feature = "wasm-engine")]
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::{Duration, Instant};
use actr_hyper::test_support::instantiate_wasm_workload;
use actr_hyper::wasm::{WasmError, WasmHost};
use actr_hyper::workload::{HostAbiFn, HostOperation, HostOperationResult, InvocationContext};
use actr_protocol::{ActrId, ActrType, Realm, RpcEnvelope, prost::Message as ProstMessage};
#[path = "wasm_actor_fixture.rs"]
mod wasm_actor_fixture;
fn fixture_component_bytes() -> &'static [u8] {
wasm_actor_fixture::WASM_ACTOR_FIXTURE
}
fn test_actr_id() -> ActrId {
ActrId {
realm: Realm { realm_id: 1 },
serial_number: 1,
r#type: ActrType {
manufacturer: "test".to_string(),
name: "fixture".to_string(),
version: "0.1.0".to_string(),
},
}
}
fn test_ctx() -> InvocationContext {
InvocationContext {
self_id: test_actr_id(),
caller_id: None,
request_id: "test-req".to_string(),
}
}
fn make_envelope(route_key: &str, payload: Vec<u8>) -> Vec<u8> {
RpcEnvelope {
route_key: route_key.to_string(),
payload: Some(payload.into()),
request_id: "test-req".to_string(),
..Default::default()
}
.encode_to_vec()
}
fn doubling_bridge(sleep: Option<Duration>) -> (HostAbiFn, Arc<AtomicU64>) {
let counter = Arc::new(AtomicU64::new(0));
let counter_clone = counter.clone();
let bridge: HostAbiFn = Arc::new(move |op| {
let counter = counter_clone.clone();
let sleep = sleep;
Box::pin(async move {
counter.fetch_add(1, Ordering::SeqCst);
if let Some(dur) = sleep {
tokio::time::sleep(dur).await;
}
match op {
HostOperation::CallRaw(req) if req.route_key == "test/double_impl" => {
if req.payload.len() < 4 {
return HostOperationResult::Error(-1);
}
let x = i32::from_le_bytes([
req.payload[0],
req.payload[1],
req.payload[2],
req.payload[3],
]);
HostOperationResult::Bytes((x * 2).to_le_bytes().to_vec())
}
_ => HostOperationResult::Error(-1),
}
})
});
(bridge, counter)
}
fn unreachable_bridge() -> HostAbiFn {
Arc::new(|_| Box::pin(async move { HostOperationResult::Error(-1) }))
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn component_model_basic_echo_round_trip() {
let host = WasmHost::compile(fixture_component_bytes()).expect("compile component");
let mut wl = instantiate_wasm_workload(&host).await.expect("instantiate");
let payload = b"hello-component".to_vec();
let req = make_envelope("test/echo", payload.clone());
let bridge = unreachable_bridge();
let reply = wl
.handle(&req, test_ctx(), &bridge)
.await
.expect("echo dispatch should succeed");
assert_eq!(reply, payload, "test/echo must round-trip the payload");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn component_model_cross_instance_parallelism() {
let host = WasmHost::compile(fixture_component_bytes()).expect("compile component");
let mut wl_a = instantiate_wasm_workload(&host)
.await
.expect("instantiate A");
let mut wl_b = instantiate_wasm_workload(&host)
.await
.expect("instantiate B");
let (bridge_a, ca) = doubling_bridge(Some(Duration::from_millis(50)));
let (bridge_b, cb) = doubling_bridge(Some(Duration::from_millis(50)));
let req_a = make_envelope("test/double", 7i32.to_le_bytes().to_vec());
let req_b = make_envelope("test/double", 11i32.to_le_bytes().to_vec());
let ctx_a = test_ctx();
let ctx_b = test_ctx();
let t0 = Instant::now();
let (ra, rb) = tokio::join!(
async { wl_a.handle(&req_a, ctx_a, &bridge_a).await },
async { wl_b.handle(&req_b, ctx_b, &bridge_b).await },
);
let elapsed = t0.elapsed();
let reply_a = ra.expect("dispatch A should succeed");
let reply_b = rb.expect("dispatch B should succeed");
let val_a = i32::from_le_bytes([reply_a[0], reply_a[1], reply_a[2], reply_a[3]]);
let val_b = i32::from_le_bytes([reply_b[0], reply_b[1], reply_b[2], reply_b[3]]);
assert_eq!(val_a, 14, "7 * 2 = 14 from bridge A");
assert_eq!(val_b, 22, "11 * 2 = 22 from bridge B");
assert_eq!(ca.load(Ordering::SeqCst), 1, "bridge A must be called once");
assert_eq!(cb.load(Ordering::SeqCst), 1, "bridge B must be called once");
let elapsed_ms = elapsed.as_secs_f64() * 1000.0;
assert!(
elapsed_ms < 90.0,
"cross-instance dispatches must run concurrently; saw {elapsed_ms:.1} ms \
(two 50 ms host sleeps, serial would be ~100 ms)"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn component_model_executor_non_blocking_during_host_await() {
let host = WasmHost::compile(fixture_component_bytes()).expect("compile component");
let mut wl = instantiate_wasm_workload(&host).await.expect("instantiate");
let tick_count = Arc::new(AtomicU64::new(0));
let tick_stop = Arc::new(AtomicBool::new(false));
let tc = tick_count.clone();
let ts = tick_stop.clone();
let ticker = tokio::spawn(async move {
while !ts.load(Ordering::SeqCst) {
tokio::time::sleep(Duration::from_millis(10)).await;
tc.fetch_add(1, Ordering::SeqCst);
}
});
let (bridge, _counter) = doubling_bridge(Some(Duration::from_millis(80)));
let req = make_envelope("test/double", 3i32.to_le_bytes().to_vec());
let _ = wl
.handle(&req, test_ctx(), &bridge)
.await
.expect("double dispatch should succeed");
tick_stop.store(true, Ordering::SeqCst);
let _ = ticker.await;
let ticks = tick_count.load(Ordering::SeqCst);
assert!(
ticks >= 3,
"tokio executor must keep running during guest host-import await; saw {ticks} ticks \
(expected ~8 ticks over 80 ms sleep)"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn component_model_error_variant_propagates() {
let host = WasmHost::compile(fixture_component_bytes()).expect("compile component");
let mut wl = instantiate_wasm_workload(&host).await.expect("instantiate");
let bridge = unreachable_bridge();
let req = make_envelope("unknown/route", Vec::new());
let err = wl
.handle(&req, test_ctx(), &bridge)
.await
.expect_err("unknown route must surface guest error");
match &err {
WasmError::ExecutionFailed(msg) => {
assert!(
msg.contains("UnknownRoute"),
"error should carry the UnknownRoute variant, got: {msg}"
);
}
other => panic!("expected ExecutionFailed, got {other:?}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn component_model_panic_after_await_surfaces_as_trap() {
let host = WasmHost::compile(fixture_component_bytes()).expect("compile component");
let mut wl = instantiate_wasm_workload(&host).await.expect("instantiate");
let (bridge, counter) = doubling_bridge(Some(Duration::from_millis(10)));
let req = make_envelope("test/boom-after-await", 1i32.to_le_bytes().to_vec());
let err = wl
.handle(&req, test_ctx(), &bridge)
.await
.expect_err("post-await panic must surface as a host-visible error");
assert_eq!(
counter.load(Ordering::SeqCst),
1,
"host bridge must have serviced the pre-panic call_raw exactly once"
);
match &err {
WasmError::ExecutionFailed(msg) => {
let lower = msg.to_ascii_lowercase();
assert!(
lower.contains("trap") || lower.contains("panic"),
"expected trap/panic in error message, got: {msg}"
);
}
other => panic!("expected ExecutionFailed, got {other:?}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn component_model_per_call_overhead() {
let host = WasmHost::compile(fixture_component_bytes()).expect("compile component");
let mut wl = instantiate_wasm_workload(&host).await.expect("instantiate");
let bridge = unreachable_bridge();
let payload = vec![0u8; 64];
let req = make_envelope("test/echo", payload);
let _ = wl.handle(&req, test_ctx(), &bridge).await.expect("warm-up");
let iters: u64 = 1000;
let t0 = Instant::now();
for _ in 0..iters {
let _ = wl
.handle(&req, test_ctx(), &bridge)
.await
.expect("bench dispatch");
}
let elapsed = t0.elapsed();
let per_call_us = elapsed.as_secs_f64() * 1_000_000.0 / iters as f64;
eprintln!(
"[component_model_per_call_overhead] {iters} sequential dispatches in {:.2} ms; \
per call: {per_call_us:.2} us (Phase 0.5 spike baseline: ~1100 us with 50 ms host sleep \
folded in; this measurement excludes host sleep so numbers are not one-to-one).",
elapsed.as_secs_f64() * 1000.0
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn component_model_call_on_start_does_not_trap() {
let host = WasmHost::compile(fixture_component_bytes()).expect("compile component");
let mut wl = instantiate_wasm_workload(&host).await.expect("instantiate");
wl.call_on_start()
.await
.expect("call_on_start should no longer trap with a lifecycle invocation context");
let req = make_envelope("test/echo", b"after-on-start".to_vec());
let reply = wl
.handle(&req, test_ctx(), &unreachable_bridge())
.await
.expect("dispatch after on_start should succeed");
assert_eq!(reply, b"after-on-start");
}