actr-hyper 0.3.1

Hyper — Actor platform infrastructure: sandbox, transport, scheduler, WASM engine, signing, AIS bootstrap, persistence & crypto primitives
Documentation
#![cfg(any(feature = "wasm-engine", feature = "dynclib-engine"))]

use std::path::{Path, PathBuf};
use std::sync::Arc;
#[cfg(feature = "dynclib-engine")]
use std::sync::LazyLock;
use std::time::{Duration, UNIX_EPOCH};

use actr_hyper::test_support::{TestPackageHookEvent, runtime_context_with_host_transport};
use actr_hyper::workload::{HostAbiFn, HostOperation, HostOperationResult, InvocationContext};
use actr_protocol::{ActrError, ActrId, ActrType, PayloadType, Realm};

#[cfg(feature = "wasm-engine")]
use actr_hyper::test_support::instantiate_wasm_workload;
#[cfg(feature = "wasm-engine")]
use actr_hyper::wasm::WasmHost;

#[cfg(feature = "dynclib-engine")]
use actr_framework::guest::dynclib_abi::{InitPayloadV1, version};
#[cfg(feature = "dynclib-engine")]
use actr_hyper::dynclib::DynclibHost;
#[cfg(feature = "dynclib-engine")]
use actr_hyper::test_support::instantiate_dynclib_workload;

#[cfg(feature = "wasm-engine")]
#[path = "wasm_actor_fixture.rs"]
mod wasm_actor_fixture;

#[cfg(feature = "dynclib-engine")]
static DYNCLIB_PACKAGE_HOOK_TEST_LOCK: LazyLock<tokio::sync::Mutex<()>> =
    LazyLock::new(|| tokio::sync::Mutex::new(()));

fn test_actr_id() -> ActrId {
    ActrId {
        realm: Realm { realm_id: 1 },
        serial_number: 1,
        r#type: ActrType {
            manufacturer: "test".to_string(),
            name: "fixture".to_string(),
            version: "0.1.0".to_string(),
        },
    }
}

fn test_ctx() -> InvocationContext {
    test_ctx_with_request_id("package-hook-test")
}

fn test_ctx_with_request_id(request_id: &'static str) -> InvocationContext {
    InvocationContext {
        self_id: test_actr_id(),
        caller_id: None,
        request_id: request_id.to_string(),
    }
}

fn package_hook_cases() -> Vec<(TestPackageHookEvent, &'static str)> {
    let peer = test_actr_id();
    let expiry = UNIX_EPOCH + Duration::from_secs(1_725_000_000);
    vec![
        (
            TestPackageHookEvent::SignalingConnecting,
            "on_signaling_connecting",
        ),
        (
            TestPackageHookEvent::SignalingConnected,
            "on_signaling_connected",
        ),
        (
            TestPackageHookEvent::SignalingDisconnected,
            "on_signaling_disconnected",
        ),
        (
            TestPackageHookEvent::WebSocketConnecting { peer: peer.clone() },
            "on_websocket_connecting:peer=1:relayed=none",
        ),
        (
            TestPackageHookEvent::WebSocketConnected { peer: peer.clone() },
            "on_websocket_connected:peer=1:relayed=none",
        ),
        (
            TestPackageHookEvent::WebSocketDisconnected { peer: peer.clone() },
            "on_websocket_disconnected:peer=1:relayed=none",
        ),
        (
            TestPackageHookEvent::WebRtcConnecting { peer: peer.clone() },
            "on_webrtc_connecting:peer=1:relayed=none",
        ),
        (
            TestPackageHookEvent::WebRtcConnected {
                peer: peer.clone(),
                relayed: true,
            },
            "on_webrtc_connected:peer=1:relayed=true",
        ),
        (
            TestPackageHookEvent::WebRtcDisconnected { peer },
            "on_webrtc_disconnected:peer=1:relayed=none",
        ),
        (
            TestPackageHookEvent::CredentialRenewed { new_expiry: expiry },
            "on_credential_renewed:expiry=1725000000",
        ),
        (
            TestPackageHookEvent::CredentialExpiring { new_expiry: expiry },
            "on_credential_expiring:expiry=1725000000",
        ),
        (
            TestPackageHookEvent::MailboxBackpressure {
                queue_len: 7,
                threshold: 3,
            },
            "on_mailbox_backpressure:queue_len=7:threshold=3",
        ),
    ]
}

fn recording_bridge() -> (HostAbiFn, tokio::sync::mpsc::UnboundedReceiver<String>) {
    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
    let bridge: HostAbiFn = Arc::new(move |op| {
        let tx = tx.clone();
        Box::pin(async move {
            match op {
                HostOperation::CallRaw(req) if req.route_key == "test/record_hook" => {
                    let name = String::from_utf8(req.payload).expect("hook name is utf8");
                    let _ = tx.send(name);
                    HostOperationResult::Bytes(Vec::new())
                }
                _ => HostOperationResult::Error(-1),
            }
        })
    });
    (bridge, rx)
}

fn recording_host_transport() -> (
    Arc<actr_hyper::HostTransport>,
    tokio::sync::mpsc::UnboundedReceiver<String>,
    tokio::task::JoinHandle<()>,
) {
    let host_transport = Arc::new(actr_hyper::HostTransport::new());
    let recorder_transport = host_transport.clone();
    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
    let task = tokio::spawn(async move {
        let request_lane = recorder_transport
            .get_lane(PayloadType::RpcReliable, None)
            .await
            .expect("host transport reliable lane should exist");
        while let Ok(envelope) = request_lane.recv_envelope().await {
            let request_id = envelope.request_id.clone();
            match envelope.route_key.as_str() {
                "test/record_hook" => {
                    let payload = envelope.payload.unwrap_or_default();
                    let name = String::from_utf8(payload.to_vec()).expect("hook name is utf8");
                    let _ = tx.send(name);
                    let _ = recorder_transport
                        .complete_response(&request_id, bytes::Bytes::new())
                        .await;
                }
                route_key => {
                    let _ = tx.send(format!("unexpected route: {route_key}"));
                    let _ = recorder_transport
                        .complete_error(&request_id, ActrError::UnknownRoute(route_key.to_string()))
                        .await;
                }
            }
        }
    });

    (host_transport, rx, task)
}

async fn assert_recorded_hooks(
    mut rx: tokio::sync::mpsc::UnboundedReceiver<String>,
    expected: Vec<&'static str>,
) {
    for expected_name in expected {
        let observed = tokio::time::timeout(Duration::from_secs(1), rx.recv())
            .await
            .expect("timed out waiting for hook record")
            .expect("recording bridge dropped");
        assert_eq!(observed, expected_name);
    }
}

#[cfg(feature = "wasm-engine")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn wasm_package_receives_runtime_hook_events() {
    let host =
        WasmHost::compile(wasm_actor_fixture::WASM_ACTOR_FIXTURE).expect("compile component");
    let mut workload = instantiate_wasm_workload(&host)
        .await
        .expect("instantiate wasm workload");
    let (bridge, rx) = recording_bridge();
    let cases = package_hook_cases();
    let expected = cases.iter().map(|(_, name)| *name).collect::<Vec<_>>();

    for (event, _) in cases {
        workload
            .call_hook_event(event, test_ctx(), &bridge)
            .await
            .expect("wasm hook event should dispatch");
    }

    assert_recorded_hooks(rx, expected).await;
}

#[cfg(feature = "wasm-engine")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn wasm_package_observer_bridge_reaches_guest_hooks() {
    let host =
        WasmHost::compile(wasm_actor_fixture::WASM_ACTOR_FIXTURE).expect("compile component");
    let workload = instantiate_wasm_workload(&host)
        .await
        .expect("instantiate wasm workload");
    let observer = workload.into_package_hook_observer();
    let (host_transport, rx, recorder) = recording_host_transport();
    let ctx = runtime_context_with_host_transport(test_actr_id(), host_transport);
    let cases = package_hook_cases();
    let expected = cases.iter().map(|(_, name)| *name).collect::<Vec<_>>();

    for (event, _) in cases {
        observer.call(event, &ctx).await;
    }

    assert_recorded_hooks(rx, expected).await;
    recorder.abort();
}

#[cfg(feature = "wasm-engine")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn wasm_package_receives_on_ready_and_on_stop() {
    let host =
        WasmHost::compile(wasm_actor_fixture::WASM_ACTOR_FIXTURE).expect("compile component");
    let mut workload = instantiate_wasm_workload(&host)
        .await
        .expect("instantiate wasm workload");
    let (bridge, rx) = recording_bridge();

    workload
        .call_on_ready(test_ctx_with_request_id("lifecycle:on_ready"), &bridge)
        .await
        .expect("wasm on_ready should dispatch");
    workload
        .call_on_stop(test_ctx_with_request_id("lifecycle:on_stop"), &bridge)
        .await
        .expect("wasm on_stop should dispatch");

    assert_recorded_hooks(rx, vec!["on_ready", "on_stop"]).await;
}

#[cfg(feature = "dynclib-engine")]
fn fixture_so_path() -> PathBuf {
    let manifest_dir = Path::new(env!("CARGO_MANIFEST_DIR"));
    let fixture_dir = manifest_dir.join("tests/dynclib_actor_fixture");

    let status = std::process::Command::new("cargo")
        .args(["build"])
        .current_dir(&fixture_dir)
        .status()
        .expect("failed to build dynclib fixture");
    assert!(status.success(), "dynclib fixture build failed");

    let target_dir = manifest_dir.join("../../target/core-hyper-tests-dynclib-actor-fixture/debug");
    if cfg!(target_os = "linux") {
        target_dir.join("libdynclib_actor_fixture.so")
    } else if cfg!(target_os = "macos") {
        target_dir.join("libdynclib_actor_fixture.dylib")
    } else {
        target_dir.join("dynclib_actor_fixture.dll")
    }
}

#[cfg(feature = "dynclib-engine")]
fn dynclib_init_payload() -> InitPayloadV1 {
    InitPayloadV1 {
        version: version::V1,
        actr_type: "test:fixture:0.1.0".to_string(),
        credential: Vec::new(),
        actor_id: Vec::new(),
        realm_id: 1,
    }
}

#[cfg(feature = "dynclib-engine")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn dynclib_package_receives_runtime_hook_events() {
    let _guard = DYNCLIB_PACKAGE_HOOK_TEST_LOCK.lock().await;
    let host = DynclibHost::load(fixture_so_path()).expect("load dynclib fixture");
    let mut workload =
        instantiate_dynclib_workload(host, &dynclib_init_payload()).expect("instantiate dynclib");
    let (bridge, rx) = recording_bridge();
    let cases = package_hook_cases();
    let expected = cases.iter().map(|(_, name)| *name).collect::<Vec<_>>();

    for (event, _) in cases {
        workload
            .call_hook_event(event, test_ctx(), &bridge)
            .await
            .expect("dynclib hook event should dispatch");
    }

    assert_recorded_hooks(rx, expected).await;
}

#[cfg(feature = "dynclib-engine")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn dynclib_package_observer_bridge_reaches_guest_hooks() {
    let _guard = DYNCLIB_PACKAGE_HOOK_TEST_LOCK.lock().await;
    let host = DynclibHost::load(fixture_so_path()).expect("load dynclib fixture");
    let workload =
        instantiate_dynclib_workload(host, &dynclib_init_payload()).expect("instantiate dynclib");
    let observer = workload.into_package_hook_observer();
    let (host_transport, rx, recorder) = recording_host_transport();
    let ctx = runtime_context_with_host_transport(test_actr_id(), host_transport);
    let cases = package_hook_cases();
    let expected = cases.iter().map(|(_, name)| *name).collect::<Vec<_>>();

    for (event, _) in cases {
        observer.call(event, &ctx).await;
    }

    assert_recorded_hooks(rx, expected).await;
    recorder.abort();
}

#[cfg(feature = "dynclib-engine")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn dynclib_package_receives_on_ready_and_on_stop() {
    let _guard = DYNCLIB_PACKAGE_HOOK_TEST_LOCK.lock().await;
    let host = DynclibHost::load(fixture_so_path()).expect("load dynclib fixture");
    let mut workload =
        instantiate_dynclib_workload(host, &dynclib_init_payload()).expect("instantiate dynclib");
    let (bridge, rx) = recording_bridge();

    workload
        .call_on_ready(test_ctx_with_request_id("lifecycle:on_ready"), &bridge)
        .await
        .expect("dynclib on_ready should dispatch");
    workload
        .call_on_stop(test_ctx_with_request_id("lifecycle:on_stop"), &bridge)
        .await
        .expect("dynclib on_stop should dispatch");

    assert_recorded_hooks(rx, vec!["on_ready", "on_stop"]).await;
}