#[cfg(feature = "wasm-engine")]
use crate::HostOperationResult;
use crate::{BinaryKind, Hyper, WorkloadPackage};
#[cfg(any(feature = "wasm-engine", feature = "dynclib-engine"))]
use crate::{HostAbiFn, InvocationContext};
#[cfg(any(feature = "wasm-engine", feature = "dynclib-engine"))]
use actr_framework::guest::dynclib_abi::InitPayloadV1;
use actr_pack::PackageManifest;
use actr_protocol::{AIdCredential, ActrId};
use std::sync::Arc;
#[path = "../../tests/common/harness.rs"]
pub mod harness;
#[path = "../../tests/common/signaling.rs"]
pub mod signaling;
#[path = "../../tests/common/utils.rs"]
pub mod utils;
#[path = "../../tests/common/vnet.rs"]
pub mod vnet;
pub use harness::{TestHarness, TestPeer};
pub use signaling::TestSignalingServer;
pub use utils::{
create_credential_state_for_test, create_peer_with_vnet, create_peer_with_websocket,
dummy_credential, make_actor_id, spawn_echo_responder, spawn_response_receiver,
};
pub use vnet::{VNetPair, create_vnet_pair};
pub use crate::transport::lane::{
WebRtcFragmentSendEvent, WebRtcFragmentSendHook, WebRtcFragmentSendHookGuard,
install_webrtc_fragment_send_hook_for_test,
};
pub fn attached_node_has_hook_observer(node: &crate::Node<crate::Attached>) -> bool {
node.attachment
.as_ref()
.expect("Node<Attached> without attachment")
.node
.hook_observer
.is_some()
}
pub fn runtime_context_with_host_transport(
self_id: ActrId,
host_transport: Arc<crate::transport::HostTransport>,
) -> crate::context::RuntimeContext {
use crate::inbound::{DataStreamRegistry, MediaFrameRegistry};
use crate::outbound::{Gate, HostGate};
use crate::wire::webrtc::{
ReconnectConfig, SignalingClient, SignalingConfig, WebSocketSignalingClient,
};
let inproc_gate = Gate::Host(Arc::new(HostGate::new(host_transport)));
let outproc_gate = Some(inproc_gate.clone());
let signaling_client: Arc<dyn SignalingClient> =
Arc::new(WebSocketSignalingClient::new(SignalingConfig {
server_url: url::Url::parse("ws://127.0.0.1:9").expect("valid test URL"),
connection_timeout: 1,
heartbeat_interval: 30,
reconnect_config: ReconnectConfig::default(),
auth_config: None,
webrtc_role: None,
}));
crate::context::RuntimeContext::new(
self_id,
None,
"package-hook-observer-test".to_string(),
inproc_gate,
outproc_gate,
Arc::new(DataStreamRegistry::new()),
Arc::new(MediaFrameRegistry::new()),
signaling_client,
AIdCredential {
key_id: 1,
claims: bytes::Bytes::from_static(b"claims"),
signature: bytes::Bytes::from(vec![0; 64]),
},
None,
)
}
#[cfg(any(feature = "wasm-engine", feature = "dynclib-engine"))]
#[derive(Debug, Clone)]
pub enum TestPackageHookEvent {
SignalingConnecting,
SignalingConnected,
SignalingDisconnected,
WebSocketConnecting { peer: ActrId },
WebSocketConnected { peer: ActrId },
WebSocketDisconnected { peer: ActrId },
WebRtcConnecting { peer: ActrId },
WebRtcConnected { peer: ActrId, relayed: bool },
WebRtcDisconnected { peer: ActrId },
CredentialRenewed { new_expiry: std::time::SystemTime },
CredentialExpiring { new_expiry: std::time::SystemTime },
MailboxBackpressure { queue_len: usize, threshold: usize },
}
#[cfg(any(feature = "wasm-engine", feature = "dynclib-engine"))]
impl From<TestPackageHookEvent> for crate::workload::PackageHookEvent {
fn from(event: TestPackageHookEvent) -> Self {
match event {
TestPackageHookEvent::SignalingConnecting => Self::SignalingConnecting,
TestPackageHookEvent::SignalingConnected => Self::SignalingConnected,
TestPackageHookEvent::SignalingDisconnected => Self::SignalingDisconnected,
TestPackageHookEvent::WebSocketConnecting { peer } => {
Self::WebSocketConnecting(actr_framework::PeerEvent {
peer,
relayed: None,
})
}
TestPackageHookEvent::WebSocketConnected { peer } => {
Self::WebSocketConnected(actr_framework::PeerEvent {
peer,
relayed: None,
})
}
TestPackageHookEvent::WebSocketDisconnected { peer } => {
Self::WebSocketDisconnected(actr_framework::PeerEvent {
peer,
relayed: None,
})
}
TestPackageHookEvent::WebRtcConnecting { peer } => {
Self::WebRtcConnecting(actr_framework::PeerEvent {
peer,
relayed: None,
})
}
TestPackageHookEvent::WebRtcConnected { peer, relayed } => {
Self::WebRtcConnected(actr_framework::PeerEvent {
peer,
relayed: Some(relayed),
})
}
TestPackageHookEvent::WebRtcDisconnected { peer } => {
Self::WebRtcDisconnected(actr_framework::PeerEvent {
peer,
relayed: None,
})
}
TestPackageHookEvent::CredentialRenewed { new_expiry } => {
Self::CredentialRenewed(actr_framework::CredentialEvent { new_expiry })
}
TestPackageHookEvent::CredentialExpiring { new_expiry } => {
Self::CredentialExpiring(actr_framework::CredentialEvent { new_expiry })
}
TestPackageHookEvent::MailboxBackpressure {
queue_len,
threshold,
} => Self::MailboxBackpressure(actr_framework::BackpressureEvent {
queue_len,
threshold,
}),
}
}
}
#[cfg(any(feature = "wasm-engine", feature = "dynclib-engine"))]
pub struct TestPackageHookObserver {
observer: crate::workload::PackageHookObserver,
}
#[cfg(any(feature = "wasm-engine", feature = "dynclib-engine"))]
impl TestPackageHookObserver {
fn from_workload(workload: crate::workload::Workload) -> Self {
let workload_dispatch = Arc::new(tokio::sync::Mutex::new(workload));
Self {
observer: crate::workload::PackageHookObserver { workload_dispatch },
}
}
pub async fn call(&self, event: TestPackageHookEvent, ctx: &crate::context::RuntimeContext) {
use crate::lifecycle::hooks::WorkloadHookObserver as _;
match event {
TestPackageHookEvent::SignalingConnecting => {
self.observer.on_signaling_connecting(Some(ctx)).await;
}
TestPackageHookEvent::SignalingConnected => {
self.observer.on_signaling_connected(Some(ctx)).await;
}
TestPackageHookEvent::SignalingDisconnected => {
self.observer.on_signaling_disconnected(ctx).await;
}
TestPackageHookEvent::WebSocketConnecting { peer } => {
self.observer
.on_websocket_connecting(
ctx,
&actr_framework::PeerEvent {
peer,
relayed: None,
},
)
.await;
}
TestPackageHookEvent::WebSocketConnected { peer } => {
self.observer
.on_websocket_connected(
ctx,
&actr_framework::PeerEvent {
peer,
relayed: None,
},
)
.await;
}
TestPackageHookEvent::WebSocketDisconnected { peer } => {
self.observer
.on_websocket_disconnected(
ctx,
&actr_framework::PeerEvent {
peer,
relayed: None,
},
)
.await;
}
TestPackageHookEvent::WebRtcConnecting { peer } => {
self.observer
.on_webrtc_connecting(
ctx,
&actr_framework::PeerEvent {
peer,
relayed: None,
},
)
.await;
}
TestPackageHookEvent::WebRtcConnected { peer, relayed } => {
self.observer
.on_webrtc_connected(
ctx,
&actr_framework::PeerEvent {
peer,
relayed: Some(relayed),
},
)
.await;
}
TestPackageHookEvent::WebRtcDisconnected { peer } => {
self.observer
.on_webrtc_disconnected(
ctx,
&actr_framework::PeerEvent {
peer,
relayed: None,
},
)
.await;
}
TestPackageHookEvent::CredentialRenewed { new_expiry } => {
self.observer
.on_credential_renewed(ctx, &actr_framework::CredentialEvent { new_expiry })
.await;
}
TestPackageHookEvent::CredentialExpiring { new_expiry } => {
self.observer
.on_credential_expiring(ctx, &actr_framework::CredentialEvent { new_expiry })
.await;
}
TestPackageHookEvent::MailboxBackpressure {
queue_len,
threshold,
} => {
self.observer
.on_mailbox_backpressure(
ctx,
&actr_framework::BackpressureEvent {
queue_len,
threshold,
},
)
.await;
}
}
}
}
#[derive(Debug, Clone)]
pub struct LoadedWorkloadSummary {
pub binary_kind: BinaryKind,
manifest: PackageManifest,
}
impl LoadedWorkloadSummary {
pub fn manifest(&self) -> &PackageManifest {
&self.manifest
}
}
pub async fn inspect_workload_package(
hyper: &Hyper,
package: &WorkloadPackage,
) -> crate::error::HyperResult<LoadedWorkloadSummary> {
let loaded = hyper.load_workload_package(package).await?;
Ok(LoadedWorkloadSummary {
binary_kind: loaded.binary_kind,
manifest: loaded.verified.manifest,
})
}
#[cfg(feature = "wasm-engine")]
#[derive(Debug)]
pub struct TestWasmWorkload {
inner: crate::wasm::WasmWorkload,
}
#[cfg(feature = "wasm-engine")]
impl TestWasmWorkload {
pub fn init(&mut self, init_payload: &InitPayloadV1) -> Result<(), crate::wasm::WasmError> {
self.inner.init(init_payload)
}
pub async fn call_on_start(&mut self) -> Result<(), crate::wasm::WasmError> {
let ctx = InvocationContext {
self_id: actr_protocol::ActrId::default(),
caller_id: None,
request_id: "test:on_start".to_string(),
};
let host_abi: HostAbiFn =
std::sync::Arc::new(|_| Box::pin(async { HostOperationResult::Done }));
self.inner.call_on_start(ctx, &host_abi).await
}
pub async fn call_on_ready(
&mut self,
ctx: InvocationContext,
host_abi: &HostAbiFn,
) -> Result<(), crate::wasm::WasmError> {
self.inner.call_on_ready(ctx, host_abi).await
}
pub async fn call_on_stop(
&mut self,
ctx: InvocationContext,
host_abi: &HostAbiFn,
) -> Result<(), crate::wasm::WasmError> {
self.inner.call_on_stop(ctx, host_abi).await
}
pub async fn call_hook_event(
&mut self,
event: TestPackageHookEvent,
ctx: InvocationContext,
host_abi: &HostAbiFn,
) -> Result<(), crate::wasm::WasmError> {
self.inner
.call_hook_event(event.into(), ctx, host_abi)
.await
}
pub async fn handle(
&mut self,
request_bytes: &[u8],
ctx: InvocationContext,
host_abi: &HostAbiFn,
) -> Result<Vec<u8>, crate::wasm::WasmError> {
self.inner.handle(request_bytes, ctx, host_abi).await
}
pub fn into_package_hook_observer(self) -> TestPackageHookObserver {
TestPackageHookObserver::from_workload(crate::workload::Workload::Wasm(self.inner))
}
}
#[cfg(feature = "wasm-engine")]
pub async fn instantiate_wasm_workload(
host: &crate::wasm::WasmHost,
) -> Result<TestWasmWorkload, crate::wasm::WasmError> {
Ok(TestWasmWorkload {
inner: host.instantiate().await?,
})
}
#[cfg(feature = "dynclib-engine")]
#[derive(Debug)]
pub struct TestDynclibWorkload {
inner: crate::dynclib::DynClibWorkload,
}
#[cfg(feature = "dynclib-engine")]
impl TestDynclibWorkload {
pub async fn handle(
&mut self,
request_bytes: &[u8],
ctx: InvocationContext,
call_executor: &HostAbiFn,
) -> Result<Vec<u8>, crate::dynclib::DynclibError> {
self.inner.handle(request_bytes, ctx, call_executor).await
}
pub async fn call_hook_event(
&mut self,
event: TestPackageHookEvent,
ctx: InvocationContext,
call_executor: &HostAbiFn,
) -> Result<(), crate::dynclib::DynclibError> {
self.inner
.call_hook_event(event.into(), ctx, call_executor)
.await
}
pub async fn call_on_ready(
&mut self,
ctx: InvocationContext,
call_executor: &HostAbiFn,
) -> Result<(), crate::dynclib::DynclibError> {
self.inner.call_on_ready(ctx, call_executor).await
}
pub async fn call_on_stop(
&mut self,
ctx: InvocationContext,
call_executor: &HostAbiFn,
) -> Result<(), crate::dynclib::DynclibError> {
self.inner.call_on_stop(ctx, call_executor).await
}
pub fn into_package_hook_observer(self) -> TestPackageHookObserver {
TestPackageHookObserver::from_workload(crate::workload::Workload::DynClib(self.inner))
}
}
#[cfg(feature = "dynclib-engine")]
pub fn instantiate_dynclib_workload(
host: crate::dynclib::DynclibHost,
init_payload: &InitPayloadV1,
) -> Result<TestDynclibWorkload, crate::dynclib::DynclibError> {
let instance = host.instantiate(init_payload)?;
Ok(TestDynclibWorkload {
inner: crate::dynclib::DynClibWorkload::new(host, instance),
})
}