use std::sync::OnceLock;
use actr_protocol::{ActrError, RpcEnvelope};
use bytes::Bytes;
use crate::workload::{BackpressureEvent, CredentialEvent, ErrorCategory, ErrorEvent, PeerEvent};
use crate::{MessageDispatcher, Workload};
use super::context::{WasmContext, proto_actr_error_to_wit, wit_actr_error_to_proto};
use super::generated::actr::workload::types as wit_types;
pub struct WorkloadCell<W: 'static> {
cell: OnceLock<W>,
}
impl<W: 'static> WorkloadCell<W> {
pub const fn new() -> Self {
Self {
cell: OnceLock::new(),
}
}
pub fn get_or_init<F: FnOnce() -> W>(&self, init: F) -> &W {
self.cell.get_or_init(init)
}
}
impl<W: 'static> Default for WorkloadCell<W> {
fn default() -> Self {
Self::new()
}
}
pub(crate) fn actr_error_to_wit(e: ActrError) -> wit_types::ActrError {
proto_actr_error_to_wit(e)
}
pub(crate) fn actr_error_from_wit(e: wit_types::ActrError) -> ActrError {
wit_actr_error_to_proto(e)
}
fn peer_event_from_wit(e: wit_types::PeerEvent) -> PeerEvent {
PeerEvent {
peer: super::context_helpers::actr_id_from_wit(&e.peer),
relayed: e.relayed,
}
}
fn error_category_from_wit(c: wit_types::ErrorCategory) -> ErrorCategory {
match c {
wit_types::ErrorCategory::HandlerPanic => ErrorCategory::HandlerPanic,
wit_types::ErrorCategory::HandlerError => ErrorCategory::HandlerError,
wit_types::ErrorCategory::SignalingFailure => ErrorCategory::SignalingFailure,
wit_types::ErrorCategory::TransportFailure => ErrorCategory::TransportFailure,
wit_types::ErrorCategory::DataStreamDeliveryUncertain => {
ErrorCategory::DataStreamDeliveryUncertain
}
}
}
fn timestamp_from_wit(t: wit_types::Timestamp) -> std::time::SystemTime {
std::time::UNIX_EPOCH + std::time::Duration::new(t.seconds, t.nanoseconds)
}
fn error_event_from_wit(e: wit_types::ErrorEvent) -> ErrorEvent {
ErrorEvent {
source: actr_error_from_wit(e.source),
category: error_category_from_wit(e.category),
context: e.context,
timestamp: timestamp_from_wit(e.timestamp),
}
}
fn credential_event_from_wit(e: wit_types::CredentialEvent) -> CredentialEvent {
CredentialEvent {
new_expiry: timestamp_from_wit(e.new_expiry),
}
}
fn backpressure_event_from_wit(e: wit_types::BackpressureEvent) -> BackpressureEvent {
BackpressureEvent {
queue_len: e.queue_len as usize,
threshold: e.threshold as usize,
}
}
fn rpc_envelope_from_wit(envelope: wit_types::RpcEnvelope) -> RpcEnvelope {
RpcEnvelope {
request_id: envelope.request_id,
route_key: envelope.route_key,
payload: if envelope.payload.is_empty() {
None
} else {
Some(Bytes::from(envelope.payload))
},
..Default::default()
}
}
pub async fn run_dispatch<W: Workload>(
workload: &W,
envelope: wit_types::RpcEnvelope,
) -> Result<Vec<u8>, wit_types::ActrError> {
let ctx = WasmContext::from_host().await;
let envelope = rpc_envelope_from_wit(envelope);
match <W::Dispatcher as MessageDispatcher>::dispatch(workload, envelope, &ctx).await {
Ok(bytes) => Ok(bytes.to_vec()),
Err(e) => Err(actr_error_to_wit(e)),
}
}
pub async fn run_on_start<W: Workload>(workload: &W) -> Result<(), wit_types::ActrError> {
let ctx = WasmContext::from_host().await;
match workload.on_start(&ctx).await {
Ok(()) => Ok(()),
Err(e) => Err(actr_error_to_wit(e)),
}
}
pub async fn run_on_ready<W: Workload>(workload: &W) -> Result<(), wit_types::ActrError> {
let ctx = WasmContext::from_host().await;
match workload.on_ready(&ctx).await {
Ok(()) => Ok(()),
Err(e) => Err(actr_error_to_wit(e)),
}
}
pub async fn run_on_stop<W: Workload>(workload: &W) -> Result<(), wit_types::ActrError> {
let ctx = WasmContext::from_host().await;
match workload.on_stop(&ctx).await {
Ok(()) => Ok(()),
Err(e) => Err(actr_error_to_wit(e)),
}
}
pub async fn run_on_error<W: Workload>(
workload: &W,
event: wit_types::ErrorEvent,
) -> Result<(), wit_types::ActrError> {
let ctx = WasmContext::from_host().await;
let event = error_event_from_wit(event);
match workload.on_error(&ctx, &event).await {
Ok(()) => Ok(()),
Err(e) => Err(actr_error_to_wit(e)),
}
}
pub async fn run_on_signaling_connecting<W: Workload>(workload: &W) {
let ctx = WasmContext::from_host().await;
workload.on_signaling_connecting(Some(&ctx)).await;
}
pub async fn run_on_signaling_connected<W: Workload>(workload: &W) {
let ctx = WasmContext::from_host().await;
workload.on_signaling_connected(Some(&ctx)).await;
}
pub async fn run_on_signaling_disconnected<W: Workload>(workload: &W) {
let ctx = WasmContext::from_host().await;
workload.on_signaling_disconnected(&ctx).await;
}
pub async fn run_on_websocket_connecting<W: Workload>(workload: &W, event: wit_types::PeerEvent) {
let ctx = WasmContext::from_host().await;
let event = peer_event_from_wit(event);
workload.on_websocket_connecting(&ctx, &event).await;
}
pub async fn run_on_websocket_connected<W: Workload>(workload: &W, event: wit_types::PeerEvent) {
let ctx = WasmContext::from_host().await;
let event = peer_event_from_wit(event);
workload.on_websocket_connected(&ctx, &event).await;
}
pub async fn run_on_websocket_disconnected<W: Workload>(workload: &W, event: wit_types::PeerEvent) {
let ctx = WasmContext::from_host().await;
let event = peer_event_from_wit(event);
workload.on_websocket_disconnected(&ctx, &event).await;
}
pub async fn run_on_webrtc_connecting<W: Workload>(workload: &W, event: wit_types::PeerEvent) {
let ctx = WasmContext::from_host().await;
let event = peer_event_from_wit(event);
workload.on_webrtc_connecting(&ctx, &event).await;
}
pub async fn run_on_webrtc_connected<W: Workload>(workload: &W, event: wit_types::PeerEvent) {
let ctx = WasmContext::from_host().await;
let event = peer_event_from_wit(event);
workload.on_webrtc_connected(&ctx, &event).await;
}
pub async fn run_on_webrtc_disconnected<W: Workload>(workload: &W, event: wit_types::PeerEvent) {
let ctx = WasmContext::from_host().await;
let event = peer_event_from_wit(event);
workload.on_webrtc_disconnected(&ctx, &event).await;
}
pub async fn run_on_credential_renewed<W: Workload>(
workload: &W,
event: wit_types::CredentialEvent,
) {
let ctx = WasmContext::from_host().await;
let event = credential_event_from_wit(event);
workload.on_credential_renewed(&ctx, &event).await;
}
pub async fn run_on_credential_expiring<W: Workload>(
workload: &W,
event: wit_types::CredentialEvent,
) {
let ctx = WasmContext::from_host().await;
let event = credential_event_from_wit(event);
workload.on_credential_expiring(&ctx, &event).await;
}
pub async fn run_on_mailbox_backpressure<W: Workload>(
workload: &W,
event: wit_types::BackpressureEvent,
) {
let ctx = WasmContext::from_host().await;
let event = backpressure_event_from_wit(event);
workload.on_mailbox_backpressure(&ctx, &event).await;
}
pub async fn run_on_data_stream(
chunk: wit_types::DataStream,
sender: wit_types::ActrId,
) -> Result<(), wit_types::ActrError> {
super::context::dispatch_registered_stream(chunk, sender)
.await
.map_err(actr_error_to_wit)
}