use std::sync::Arc;
use actr_framework::guest::dynclib_abi::InitPayloadV1;
use actr_framework::{BackpressureEvent, CredentialEvent, PeerEvent};
use actr_protocol::prost::Message as ProstMessage;
use actr_protocol::{
ActrError, ActrId, ActrType, DataStream, MetadataEntry, PayloadType, Realm, RpcEnvelope,
};
use wasmtime::component::{Component, HasSelf, Linker, ResourceTable};
use wasmtime::{Config, Engine, OptLevel, RegallocAlgorithm, Store};
use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
use super::component_bindings::ActrWorkloadGuest;
use super::component_bindings::actr::workload::host::Host as HostImports;
use super::component_bindings::actr::workload::types::Host as TypesHost;
use super::component_bindings::actr::workload::types::{
self as wit_types, ActrError as WitActrError, ActrId as WitActrId, ActrType as WitActrType,
BackpressureEvent as WitBackpressureEvent, CredentialEvent as WitCredentialEvent,
DataStream as WitDataStream, Dest as WitDest, PayloadType as WitPayloadType,
PeerEvent as WitPeerEvent, Realm as WitRealm, RpcEnvelope as WitRpcEnvelope,
};
use crate::wasm::error::{WasmError, WasmResult};
use crate::workload::{
HostAbiFn, HostOperation, HostOperationResult, InvocationContext, PackageHookEvent,
};
use actr_framework::guest::dynclib_abi as guest_abi;
use actr_framework::guest::dynclib_abi::{
HostCallRawV1, HostCallV1, HostDiscoverV1, HostRegisterStreamV1, HostSendDataStreamV1,
HostTellV1, HostUnregisterStreamV1,
};
fn build_engine() -> WasmResult<Engine> {
let mut config = Config::new();
config.wasm_component_model(true);
config.wasm_component_model_async(true);
if std::env::var_os("ACTR_WASM_FAST_COMPILE").is_some() {
config.cranelift_opt_level(OptLevel::None);
config.cranelift_regalloc_algorithm(RegallocAlgorithm::SinglePass);
}
Engine::new(&config)
.map_err(|e| WasmError::LoadFailed(format!("wasmtime engine construction failed: {e}")))
}
pub(crate) struct HostState {
wasi: WasiCtx,
table: ResourceTable,
pub(crate) invocation: Option<InvocationContext>,
pub(crate) host_abi: Option<HostAbiFn>,
}
impl std::fmt::Debug for HostState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HostState")
.field("invocation", &self.invocation)
.field("host_abi", &self.host_abi.as_ref().map(|_| "<fn>"))
.finish_non_exhaustive()
}
}
impl HostState {
fn new() -> Self {
Self {
wasi: WasiCtxBuilder::new().inherit_stdio().build(),
table: ResourceTable::new(),
invocation: None,
host_abi: None,
}
}
}
impl WasiView for HostState {
fn ctx(&mut self) -> WasiCtxView<'_> {
WasiCtxView {
ctx: &mut self.wasi,
table: &mut self.table,
}
}
}
impl TypesHost for HostState {}
fn forward_host_operation(
state: &HostState,
op: HostOperation,
) -> impl std::future::Future<Output = wasmtime::Result<Result<Vec<u8>, WitActrError>>> + Send + 'static
{
let host_abi = state.host_abi.clone();
async move {
let Some(host_abi) = host_abi else {
return Err(wasmtime::Error::msg(
"host ABI bridge not installed for this dispatch",
));
};
match (host_abi)(op).await {
HostOperationResult::Bytes(bytes) => Ok(Ok(bytes)),
HostOperationResult::Done => Ok(Ok(Vec::new())),
HostOperationResult::Error(code) => Ok(Err(actr_error_from_abi_code(code))),
}
}
}
impl HostImports for HostState {
async fn call(
&mut self,
target: WitDest,
route_key: String,
payload: Vec<u8>,
) -> wasmtime::Result<Result<Vec<u8>, WitActrError>> {
let op = HostOperation::Call(HostCallV1 {
route_key,
dest: wit_dest_to_v1(&target),
payload,
});
forward_host_operation(self, op).await
}
async fn tell(
&mut self,
target: WitDest,
route_key: String,
payload: Vec<u8>,
) -> wasmtime::Result<Result<(), WitActrError>> {
let op = HostOperation::Tell(HostTellV1 {
route_key,
dest: wit_dest_to_v1(&target),
payload,
});
match forward_host_operation(self, op).await? {
Ok(_) => Ok(Ok(())),
Err(e) => Ok(Err(e)),
}
}
async fn call_raw(
&mut self,
target: WitActrId,
route_key: String,
payload: Vec<u8>,
) -> wasmtime::Result<Result<Vec<u8>, WitActrError>> {
let op = HostOperation::CallRaw(HostCallRawV1 {
route_key,
target: wit_actr_id_to_proto(&target),
payload,
});
forward_host_operation(self, op).await
}
async fn discover(
&mut self,
target_type: WitActrType,
) -> wasmtime::Result<Result<WitActrId, WitActrError>> {
let op = HostOperation::Discover(HostDiscoverV1 {
target_type: wit_actr_type_to_proto(&target_type),
});
match forward_host_operation(self, op).await? {
Ok(bytes) => match ActrId::decode(bytes.as_slice()) {
Ok(id) => Ok(Ok(proto_actr_id_to_wit(&id))),
Err(e) => Ok(Err(WitActrError::DecodeFailure(format!(
"host discover returned undecodable ActrId: {e}"
)))),
},
Err(e) => Ok(Err(e)),
}
}
async fn register_stream(
&mut self,
stream_id: String,
) -> wasmtime::Result<Result<(), WitActrError>> {
let op = HostOperation::RegisterStream(HostRegisterStreamV1 { stream_id });
match forward_host_operation(self, op).await? {
Ok(_) => Ok(Ok(())),
Err(e) => Ok(Err(e)),
}
}
async fn unregister_stream(
&mut self,
stream_id: String,
) -> wasmtime::Result<Result<(), WitActrError>> {
let op = HostOperation::UnregisterStream(HostUnregisterStreamV1 { stream_id });
match forward_host_operation(self, op).await? {
Ok(_) => Ok(Ok(())),
Err(e) => Ok(Err(e)),
}
}
async fn send_data_stream(
&mut self,
target: WitDest,
chunk: WitDataStream,
payload_type: WitPayloadType,
) -> wasmtime::Result<Result<(), WitActrError>> {
let op = HostOperation::SendDataStream(HostSendDataStreamV1 {
dest: wit_dest_to_v1(&target),
chunk: wit_data_stream_to_proto(chunk),
payload_type: wit_payload_type_to_proto(payload_type) as i32,
});
match forward_host_operation(self, op).await? {
Ok(_) => Ok(Ok(())),
Err(e) => Ok(Err(e)),
}
}
async fn log_message(&mut self, level: String, message: String) -> wasmtime::Result<()> {
match level.as_str() {
"error" => tracing::error!(target: "wasm-guest", "{message}"),
"warn" => tracing::warn!(target: "wasm-guest", "{message}"),
"info" => tracing::info!(target: "wasm-guest", "{message}"),
"debug" => tracing::debug!(target: "wasm-guest", "{message}"),
"trace" => tracing::trace!(target: "wasm-guest", "{message}"),
other => tracing::info!(target: "wasm-guest", level = %other, "{message}"),
}
Ok(())
}
async fn get_self_id(&mut self) -> wasmtime::Result<WitActrId> {
let ctx = self.invocation.as_ref().ok_or_else(|| {
wasmtime::Error::msg(
"get_self_id called outside of an active dispatch (no invocation context installed)",
)
})?;
Ok(proto_actr_id_to_wit(&ctx.self_id))
}
async fn get_caller_id(&mut self) -> wasmtime::Result<Option<WitActrId>> {
let ctx = self.invocation.as_ref().ok_or_else(|| {
wasmtime::Error::msg(
"get_caller_id called outside of an active dispatch (no invocation context installed)",
)
})?;
Ok(ctx.caller_id.as_ref().map(proto_actr_id_to_wit))
}
async fn get_request_id(&mut self) -> wasmtime::Result<String> {
let ctx = self.invocation.as_ref().ok_or_else(|| {
wasmtime::Error::msg(
"get_request_id called outside of an active dispatch (no invocation context installed)",
)
})?;
Ok(ctx.request_id.clone())
}
}
fn wit_realm_to_proto(r: &WitRealm) -> Realm {
Realm {
realm_id: r.realm_id,
}
}
fn proto_realm_to_wit(r: &Realm) -> WitRealm {
WitRealm {
realm_id: r.realm_id,
}
}
fn wit_actr_type_to_proto(t: &WitActrType) -> ActrType {
ActrType {
manufacturer: t.manufacturer.clone(),
name: t.name.clone(),
version: t.version.clone(),
}
}
fn proto_actr_type_to_wit(t: &ActrType) -> WitActrType {
WitActrType {
manufacturer: t.manufacturer.clone(),
name: t.name.clone(),
version: t.version.clone(),
}
}
fn wit_actr_id_to_proto(id: &WitActrId) -> ActrId {
ActrId {
realm: wit_realm_to_proto(&id.realm),
serial_number: id.serial_number,
r#type: wit_actr_type_to_proto(&id.type_),
}
}
fn proto_actr_id_to_wit(id: &ActrId) -> WitActrId {
WitActrId {
realm: proto_realm_to_wit(&id.realm),
serial_number: id.serial_number,
type_: proto_actr_type_to_wit(&id.r#type),
}
}
fn wit_dest_to_v1(dest: &WitDest) -> guest_abi::DestV1 {
match dest {
WitDest::Shell => guest_abi::DestV1::shell(),
WitDest::Local => guest_abi::DestV1::local(),
WitDest::Actor(id) => guest_abi::DestV1::actor(wit_actr_id_to_proto(id)),
}
}
fn actr_error_from_abi_code(code: i32) -> WitActrError {
match code {
guest_abi::code::GENERIC_ERROR => WitActrError::Internal("generic ABI error".into()),
guest_abi::code::INIT_FAILED => WitActrError::Internal("init failed".into()),
guest_abi::code::HANDLE_FAILED => WitActrError::Internal("handle failed".into()),
guest_abi::code::ALLOC_FAILED => WitActrError::Internal("allocation failed".into()),
guest_abi::code::PROTOCOL_ERROR => WitActrError::DecodeFailure("protocol error".into()),
guest_abi::code::BUFFER_TOO_SMALL => {
WitActrError::Internal("reply buffer too small".into())
}
guest_abi::code::UNSUPPORTED_OP => {
WitActrError::NotImplemented("unsupported ABI operation".into())
}
other => WitActrError::Internal(format!("ABI status {other}")),
}
}
#[allow(dead_code)]
fn actr_error_to_wit(e: &ActrError) -> WitActrError {
match e {
ActrError::Unavailable(msg) => WitActrError::Unavailable(msg.clone()),
ActrError::TimedOut => WitActrError::TimedOut,
ActrError::NotFound(msg) => WitActrError::NotFound(msg.clone()),
ActrError::PermissionDenied(msg) => WitActrError::PermissionDenied(msg.clone()),
ActrError::InvalidArgument(msg) => WitActrError::InvalidArgument(msg.clone()),
ActrError::UnknownRoute(msg) => WitActrError::UnknownRoute(msg.clone()),
ActrError::DependencyNotFound {
service_name,
message,
} => WitActrError::DependencyNotFound(wit_types::DependencyNotFoundPayload {
service_name: service_name.clone(),
message: message.clone(),
}),
ActrError::DecodeFailure(msg) => WitActrError::DecodeFailure(msg.clone()),
ActrError::NotImplemented(msg) => WitActrError::NotImplemented(msg.clone()),
ActrError::Internal(msg) => WitActrError::Internal(msg.clone()),
}
}
fn wit_actr_error_to_proto(e: WitActrError) -> ActrError {
match e {
WitActrError::Unavailable(msg) => ActrError::Unavailable(msg),
WitActrError::TimedOut => ActrError::TimedOut,
WitActrError::NotFound(msg) => ActrError::NotFound(msg),
WitActrError::PermissionDenied(msg) => ActrError::PermissionDenied(msg),
WitActrError::InvalidArgument(msg) => ActrError::InvalidArgument(msg),
WitActrError::UnknownRoute(msg) => ActrError::UnknownRoute(msg),
WitActrError::DependencyNotFound(p) => ActrError::DependencyNotFound {
service_name: p.service_name,
message: p.message,
},
WitActrError::DecodeFailure(msg) => ActrError::DecodeFailure(msg),
WitActrError::NotImplemented(msg) => ActrError::NotImplemented(msg),
WitActrError::Internal(msg) => ActrError::Internal(msg),
}
}
fn rpc_envelope_to_wit(envelope: &RpcEnvelope) -> WitRpcEnvelope {
WitRpcEnvelope {
request_id: envelope.request_id.clone(),
route_key: envelope.route_key.clone(),
payload: envelope
.payload
.as_ref()
.map(|b| b.to_vec())
.unwrap_or_default(),
}
}
fn proto_data_stream_to_wit(chunk: DataStream) -> WitDataStream {
WitDataStream {
stream_id: chunk.stream_id,
sequence: chunk.sequence,
payload: chunk.payload.to_vec(),
metadata: chunk
.metadata
.into_iter()
.map(|entry| wit_types::MetadataEntry {
key: entry.key,
value: entry.value,
})
.collect(),
timestamp_ms: chunk.timestamp_ms,
}
}
fn proto_peer_event_to_wit(event: PeerEvent) -> WitPeerEvent {
WitPeerEvent {
peer: proto_actr_id_to_wit(&event.peer),
relayed: event.relayed,
}
}
fn system_time_to_wit(time: std::time::SystemTime) -> wit_types::Timestamp {
let duration = time
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default();
wit_types::Timestamp {
seconds: duration.as_secs(),
nanoseconds: duration.subsec_nanos(),
}
}
fn proto_credential_event_to_wit(event: CredentialEvent) -> WitCredentialEvent {
WitCredentialEvent {
new_expiry: system_time_to_wit(event.new_expiry),
}
}
fn proto_backpressure_event_to_wit(event: BackpressureEvent) -> WitBackpressureEvent {
WitBackpressureEvent {
queue_len: event.queue_len as u64,
threshold: event.threshold as u64,
}
}
fn wit_data_stream_to_proto(chunk: WitDataStream) -> DataStream {
DataStream {
stream_id: chunk.stream_id,
sequence: chunk.sequence,
payload: chunk.payload.into(),
metadata: chunk
.metadata
.into_iter()
.map(|entry| MetadataEntry {
key: entry.key,
value: entry.value,
})
.collect(),
timestamp_ms: chunk.timestamp_ms,
}
}
fn wit_payload_type_to_proto(payload_type: WitPayloadType) -> PayloadType {
match payload_type {
WitPayloadType::RpcReliable => PayloadType::RpcReliable,
WitPayloadType::RpcSignal => PayloadType::RpcSignal,
WitPayloadType::StreamReliable => PayloadType::StreamReliable,
WitPayloadType::StreamLatencyFirst => PayloadType::StreamLatencyFirst,
WitPayloadType::MediaRtp => PayloadType::MediaRtp,
}
}
pub struct WasmHost {
engine: Engine,
component: Component,
}
impl std::fmt::Debug for WasmHost {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WasmHost").finish_non_exhaustive()
}
}
impl WasmHost {
pub fn compile(wasm_bytes: &[u8]) -> WasmResult<Self> {
let engine = build_engine()?;
let component = Component::from_binary(&engine, wasm_bytes).map_err(|e| {
WasmError::LoadFailed(format!(
"wasm bytes did not load as a Component (this host \
requires Component Model binaries as of .actr format \
bump; wasmtime reported: {e})"
))
})?;
tracing::info!(wasm_bytes = wasm_bytes.len(), "wasm Component compiled");
Ok(Self { engine, component })
}
pub(crate) async fn instantiate(&self) -> WasmResult<WasmWorkload> {
let mut linker: Linker<HostState> = Linker::new(&self.engine);
wasmtime_wasi::p2::add_to_linker_async(&mut linker).map_err(|e| {
WasmError::LoadFailed(format!("failed to register WASI p2 linker imports: {e}"))
})?;
ActrWorkloadGuest::add_to_linker::<_, HasSelf<_>>(&mut linker, |s| s).map_err(|e| {
WasmError::LoadFailed(format!(
"failed to register actr:workload/host linker imports: {e}"
))
})?;
let mut store = Store::new(&self.engine, HostState::new());
let bindings = ActrWorkloadGuest::instantiate_async(&mut store, &self.component, &linker)
.await
.map_err(|e| {
WasmError::LoadFailed(format!("Component instantiate_async failed: {e:#}"))
})?;
tracing::info!("wasm Component instantiated");
Ok(WasmWorkload { store, bindings })
}
}
pub(crate) struct WasmWorkload {
store: Store<HostState>,
bindings: ActrWorkloadGuest,
}
impl std::fmt::Debug for WasmWorkload {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WasmWorkload").finish_non_exhaustive()
}
}
impl WasmWorkload {
pub(crate) fn init(&mut self, init_payload: &InitPayloadV1) -> WasmResult<()> {
tracing::debug!(
actr_type = %init_payload.actr_type,
realm_id = init_payload.realm_id,
"wasm Component workload init (Component-model lifecycle handles this implicitly)"
);
Ok(())
}
fn install_invocation(&mut self, ctx: InvocationContext, host_abi: &HostAbiFn) {
let host_abi_clone: HostAbiFn = Arc::clone(host_abi);
let state = self.store.data_mut();
state.invocation = Some(ctx);
state.host_abi = Some(host_abi_clone);
}
fn clear_invocation(&mut self) {
let state = self.store.data_mut();
state.invocation = None;
state.host_abi = None;
}
pub(crate) async fn call_on_start(
&mut self,
ctx: InvocationContext,
host_abi: &HostAbiFn,
) -> WasmResult<()> {
self.install_invocation(ctx, host_abi);
let result = self
.bindings
.actr_workload_workload()
.call_on_start(&mut self.store)
.await;
self.clear_invocation();
let result =
result.map_err(|e| WasmError::ExecutionFailed(format!("on_start trap: {e}")))?;
result.map_err(|e| WasmError::ExecutionFailed(format!("on_start error: {:?}", e)))?;
Ok(())
}
pub(crate) async fn call_on_ready(
&mut self,
ctx: InvocationContext,
host_abi: &HostAbiFn,
) -> WasmResult<()> {
self.install_invocation(ctx, host_abi);
let result = self
.bindings
.actr_workload_workload()
.call_on_ready(&mut self.store)
.await;
self.clear_invocation();
let result =
result.map_err(|e| WasmError::ExecutionFailed(format!("on_ready trap: {e}")))?;
result.map_err(|e| WasmError::ExecutionFailed(format!("on_ready error: {:?}", e)))?;
Ok(())
}
pub(crate) async fn call_on_stop(
&mut self,
ctx: InvocationContext,
host_abi: &HostAbiFn,
) -> WasmResult<()> {
self.install_invocation(ctx, host_abi);
let result = self
.bindings
.actr_workload_workload()
.call_on_stop(&mut self.store)
.await;
self.clear_invocation();
let result =
result.map_err(|e| WasmError::ExecutionFailed(format!("on_stop trap: {e}")))?;
result.map_err(|e| WasmError::ExecutionFailed(format!("on_stop error: {:?}", e)))?;
Ok(())
}
pub(crate) async fn call_hook_event(
&mut self,
event: PackageHookEvent,
ctx: InvocationContext,
host_abi: &HostAbiFn,
) -> WasmResult<()> {
let label = event.request_id();
self.install_invocation(ctx, host_abi);
let result = match event {
PackageHookEvent::SignalingConnecting => {
self.bindings
.actr_workload_workload()
.call_on_signaling_connecting(&mut self.store)
.await
}
PackageHookEvent::SignalingConnected => {
self.bindings
.actr_workload_workload()
.call_on_signaling_connected(&mut self.store)
.await
}
PackageHookEvent::SignalingDisconnected => {
self.bindings
.actr_workload_workload()
.call_on_signaling_disconnected(&mut self.store)
.await
}
PackageHookEvent::WebSocketConnecting(event) => {
let event = proto_peer_event_to_wit(event);
self.bindings
.actr_workload_workload()
.call_on_websocket_connecting(&mut self.store, &event)
.await
}
PackageHookEvent::WebSocketConnected(event) => {
let event = proto_peer_event_to_wit(event);
self.bindings
.actr_workload_workload()
.call_on_websocket_connected(&mut self.store, &event)
.await
}
PackageHookEvent::WebSocketDisconnected(event) => {
let event = proto_peer_event_to_wit(event);
self.bindings
.actr_workload_workload()
.call_on_websocket_disconnected(&mut self.store, &event)
.await
}
PackageHookEvent::WebRtcConnecting(event) => {
let event = proto_peer_event_to_wit(event);
self.bindings
.actr_workload_workload()
.call_on_webrtc_connecting(&mut self.store, &event)
.await
}
PackageHookEvent::WebRtcConnected(event) => {
let event = proto_peer_event_to_wit(event);
self.bindings
.actr_workload_workload()
.call_on_webrtc_connected(&mut self.store, &event)
.await
}
PackageHookEvent::WebRtcDisconnected(event) => {
let event = proto_peer_event_to_wit(event);
self.bindings
.actr_workload_workload()
.call_on_webrtc_disconnected(&mut self.store, &event)
.await
}
PackageHookEvent::CredentialRenewed(event) => {
let event = proto_credential_event_to_wit(event);
self.bindings
.actr_workload_workload()
.call_on_credential_renewed(&mut self.store, event)
.await
}
PackageHookEvent::CredentialExpiring(event) => {
let event = proto_credential_event_to_wit(event);
self.bindings
.actr_workload_workload()
.call_on_credential_expiring(&mut self.store, event)
.await
}
PackageHookEvent::MailboxBackpressure(event) => {
let event = proto_backpressure_event_to_wit(event);
self.bindings
.actr_workload_workload()
.call_on_mailbox_backpressure(&mut self.store, event)
.await
}
};
self.clear_invocation();
result.map_err(|e| WasmError::ExecutionFailed(format!("{label} trap: {e}")))?;
Ok(())
}
pub(crate) async fn handle(
&mut self,
request_bytes: &[u8],
ctx: InvocationContext,
host_abi: &HostAbiFn,
) -> WasmResult<Vec<u8>> {
let envelope = RpcEnvelope::decode(request_bytes).map_err(|e| {
WasmError::ExecutionFailed(format!(
"host failed to decode RpcEnvelope before dispatch: {e}"
))
})?;
self.install_invocation(ctx, host_abi);
let wit_envelope = rpc_envelope_to_wit(&envelope);
let dispatch_result = self
.bindings
.actr_workload_workload()
.call_dispatch(&mut self.store, &wit_envelope)
.await;
self.clear_invocation();
match dispatch_result {
Ok(Ok(bytes)) => Ok(bytes),
Ok(Err(wit_err)) => Err(WasmError::ExecutionFailed(format!(
"guest dispatch returned error: {:?}",
wit_actr_error_to_proto(wit_err)
))),
Err(trap) => Err(WasmError::ExecutionFailed(format!(
"guest dispatch trapped: {trap}"
))),
}
}
pub(crate) async fn handle_data_stream(
&mut self,
chunk: DataStream,
sender: ActrId,
ctx: InvocationContext,
host_abi: &HostAbiFn,
) -> WasmResult<()> {
self.install_invocation(ctx, host_abi);
let wit_chunk = proto_data_stream_to_wit(chunk);
let wit_sender = proto_actr_id_to_wit(&sender);
let result = self
.bindings
.actr_workload_workload()
.call_on_data_stream(&mut self.store, &wit_chunk, &wit_sender)
.await;
self.clear_invocation();
let result =
result.map_err(|e| WasmError::ExecutionFailed(format!("on_data_stream trap: {e}")))?;
result.map_err(|e| {
WasmError::ExecutionFailed(format!(
"on_data_stream error: {:?}",
wit_actr_error_to_proto(e)
))
})?;
Ok(())
}
}