use std::time::{Duration, UNIX_EPOCH};
use actr_protocol::{ActrError, ActrId, DataStream, MetadataEntry, Realm, RpcEnvelope};
use async_trait::async_trait;
use bytes::Bytes;
use crate::web::context::WebContext;
use crate::workload::{BackpressureEvent, CredentialEvent, ErrorCategory, ErrorEvent, PeerEvent};
use crate::{MessageDispatcher, Workload};
use actr_web_abi::host as web_host;
use actr_web_abi::types as wit;
fn actr_type_from_wit(t: &wit::ActrType) -> actr_protocol::ActrType {
actr_protocol::ActrType {
manufacturer: t.manufacturer.clone(),
name: t.name.clone(),
version: t.version.clone(),
}
}
fn actr_id_from_wit(id: &wit::ActrId) -> ActrId {
ActrId {
realm: Realm {
realm_id: id.realm.realm_id,
},
serial_number: id.serial_number,
r#type: actr_type_from_wit(&id.actr_type),
}
}
fn timestamp_from_wit(t: wit::Timestamp) -> std::time::SystemTime {
UNIX_EPOCH + Duration::new(t.seconds, t.nanoseconds)
}
fn peer_event_from_wit(e: wit::PeerEvent) -> PeerEvent {
PeerEvent {
peer: actr_id_from_wit(&e.peer),
relayed: e.relayed,
}
}
fn error_category_from_wit(c: wit::ErrorCategory) -> ErrorCategory {
match c {
wit::ErrorCategory::HandlerPanic => ErrorCategory::HandlerPanic,
wit::ErrorCategory::HandlerError => ErrorCategory::HandlerError,
wit::ErrorCategory::SignalingFailure => ErrorCategory::SignalingFailure,
wit::ErrorCategory::TransportFailure => ErrorCategory::TransportFailure,
wit::ErrorCategory::DataStreamDeliveryUncertain => {
ErrorCategory::DataStreamDeliveryUncertain
}
}
}
fn wit_error_to_proto(e: wit::ActrError) -> ActrError {
match e {
wit::ActrError::Unavailable(m) => ActrError::Unavailable(m),
wit::ActrError::TimedOut => ActrError::TimedOut,
wit::ActrError::NotFound(m) => ActrError::NotFound(m),
wit::ActrError::PermissionDenied(m) => ActrError::PermissionDenied(m),
wit::ActrError::InvalidArgument(m) => ActrError::InvalidArgument(m),
wit::ActrError::UnknownRoute(m) => ActrError::UnknownRoute(m),
wit::ActrError::DependencyNotFound(p) => ActrError::DependencyNotFound {
service_name: p.service_name,
message: p.message,
},
wit::ActrError::DecodeFailure(m) => ActrError::DecodeFailure(m),
wit::ActrError::NotImplemented(m) => ActrError::NotImplemented(m),
wit::ActrError::Internal(m) => ActrError::Internal(m),
}
}
fn proto_error_to_wit(e: ActrError) -> wit::ActrError {
match e {
ActrError::Unavailable(m) => wit::ActrError::Unavailable(m),
ActrError::TimedOut => wit::ActrError::TimedOut,
ActrError::NotFound(m) => wit::ActrError::NotFound(m),
ActrError::PermissionDenied(m) => wit::ActrError::PermissionDenied(m),
ActrError::InvalidArgument(m) => wit::ActrError::InvalidArgument(m),
ActrError::UnknownRoute(m) => wit::ActrError::UnknownRoute(m),
ActrError::DependencyNotFound {
service_name,
message,
} => wit::ActrError::DependencyNotFound(wit::DependencyNotFoundPayload {
service_name,
message,
}),
ActrError::DecodeFailure(m) => wit::ActrError::DecodeFailure(m),
ActrError::NotImplemented(m) => wit::ActrError::NotImplemented(m),
ActrError::Internal(m) => wit::ActrError::Internal(m),
}
}
fn error_event_from_wit(e: wit::ErrorEvent) -> ErrorEvent {
ErrorEvent {
source: wit_error_to_proto(e.source),
category: error_category_from_wit(e.category),
context: e.context,
timestamp: timestamp_from_wit(e.timestamp),
}
}
fn credential_event_from_wit(e: wit::CredentialEvent) -> CredentialEvent {
CredentialEvent {
new_expiry: timestamp_from_wit(e.new_expiry),
}
}
fn backpressure_event_from_wit(e: wit::BackpressureEvent) -> BackpressureEvent {
BackpressureEvent {
queue_len: e.queue_len as usize,
threshold: e.threshold as usize,
}
}
fn data_stream_from_wit(chunk: wit::DataStream) -> DataStream {
DataStream {
stream_id: chunk.stream_id,
sequence: chunk.sequence,
payload: chunk.payload.into(),
metadata: chunk
.metadata
.into_iter()
.map(|entry| MetadataEntry {
key: entry.key,
value: entry.value,
})
.collect(),
timestamp_ms: chunk.timestamp_ms,
}
}
fn envelope_from_wit(envelope: wit::RpcEnvelope) -> RpcEnvelope {
RpcEnvelope {
request_id: envelope.request_id,
route_key: envelope.route_key,
payload: if envelope.payload.is_empty() {
None
} else {
Some(Bytes::from(envelope.payload))
},
..Default::default()
}
}
pub struct WebWorkloadAdapter<W> {
inner: W,
}
impl<W> WebWorkloadAdapter<W> {
pub fn new(inner: W) -> Self {
Self { inner }
}
}
impl<W: Clone> Clone for WebWorkloadAdapter<W> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
#[async_trait(?Send)]
impl<W> web_host::Workload for WebWorkloadAdapter<W>
where
W: Workload + Clone + 'static,
{
async fn dispatch(&self, envelope: wit::RpcEnvelope) -> Result<Vec<u8>, wit::ActrError> {
let ctx = WebContext::new(ActrId::default(), None, envelope.request_id.clone());
let envelope = envelope_from_wit(envelope);
let result = <<W as Workload>::Dispatcher as MessageDispatcher>::dispatch(
&self.inner,
envelope,
&ctx,
)
.await;
match result {
Ok(bytes) => Ok(bytes.to_vec()),
Err(e) => Err(proto_error_to_wit(e)),
}
}
async fn on_start(&self) -> Result<(), wit::ActrError> {
let ctx = WebContext::for_lifecycle();
self.inner.on_start(&ctx).await.map_err(proto_error_to_wit)
}
async fn on_ready(&self) -> Result<(), wit::ActrError> {
let ctx = WebContext::for_lifecycle();
self.inner.on_ready(&ctx).await.map_err(proto_error_to_wit)
}
async fn on_stop(&self) -> Result<(), wit::ActrError> {
let ctx = WebContext::for_lifecycle();
self.inner.on_stop(&ctx).await.map_err(proto_error_to_wit)
}
async fn on_error(&self, event: wit::ErrorEvent) -> Result<(), wit::ActrError> {
let ctx = WebContext::for_lifecycle();
let event = error_event_from_wit(event);
self.inner
.on_error(&ctx, &event)
.await
.map_err(proto_error_to_wit)
}
async fn on_signaling_connecting(&self) {
let ctx = WebContext::for_lifecycle();
self.inner.on_signaling_connecting(Some(&ctx)).await;
}
async fn on_signaling_connected(&self) {
let ctx = WebContext::for_lifecycle();
self.inner.on_signaling_connected(Some(&ctx)).await;
}
async fn on_signaling_disconnected(&self) {
let ctx = WebContext::for_lifecycle();
self.inner.on_signaling_disconnected(&ctx).await;
}
async fn on_websocket_connecting(&self, event: wit::PeerEvent) {
let ctx = WebContext::for_lifecycle();
let event = peer_event_from_wit(event);
self.inner.on_websocket_connecting(&ctx, &event).await;
}
async fn on_websocket_connected(&self, event: wit::PeerEvent) {
let ctx = WebContext::for_lifecycle();
let event = peer_event_from_wit(event);
self.inner.on_websocket_connected(&ctx, &event).await;
}
async fn on_websocket_disconnected(&self, event: wit::PeerEvent) {
let ctx = WebContext::for_lifecycle();
let event = peer_event_from_wit(event);
self.inner.on_websocket_disconnected(&ctx, &event).await;
}
async fn on_webrtc_connecting(&self, event: wit::PeerEvent) {
let ctx = WebContext::for_lifecycle();
let event = peer_event_from_wit(event);
self.inner.on_webrtc_connecting(&ctx, &event).await;
}
async fn on_webrtc_connected(&self, event: wit::PeerEvent) {
let ctx = WebContext::for_lifecycle();
let event = peer_event_from_wit(event);
self.inner.on_webrtc_connected(&ctx, &event).await;
}
async fn on_webrtc_disconnected(&self, event: wit::PeerEvent) {
let ctx = WebContext::for_lifecycle();
let event = peer_event_from_wit(event);
self.inner.on_webrtc_disconnected(&ctx, &event).await;
}
async fn on_credential_renewed(&self, event: wit::CredentialEvent) {
let ctx = WebContext::for_lifecycle();
let event = credential_event_from_wit(event);
self.inner.on_credential_renewed(&ctx, &event).await;
}
async fn on_credential_expiring(&self, event: wit::CredentialEvent) {
let ctx = WebContext::for_lifecycle();
let event = credential_event_from_wit(event);
self.inner.on_credential_expiring(&ctx, &event).await;
}
async fn on_mailbox_backpressure(&self, event: wit::BackpressureEvent) {
let ctx = WebContext::for_lifecycle();
let event = backpressure_event_from_wit(event);
self.inner.on_mailbox_backpressure(&ctx, &event).await;
}
async fn on_data_stream(
&self,
chunk: wit::DataStream,
_sender: wit::ActrId,
) -> Result<(), wit::ActrError> {
let chunk = data_stream_from_wit(chunk);
Err(proto_error_to_wit(ActrError::NotImplemented(format!(
"WebWorkloadAdapter::on_data_stream({})",
chunk.stream_id
))))
}
}