1use actr_framework::guest::dynclib_abi::{
7 self as guest_abi, HostCallRawV1, HostCallV1, HostDiscoverV1, HostRegisterStreamV1,
8 HostSendDataStreamV1, HostTellV1, HostUnregisterStreamV1,
9};
10#[cfg(feature = "dynclib-engine")]
11use actr_framework::guest::dynclib_abi::{AbiPayload, GuestHandleV1, GuestHookV1};
12use actr_framework::{
13 BackpressureEvent, CredentialEvent, ErrorEvent, MessageDispatcher, PeerEvent,
14 Workload as FrameworkWorkload,
15};
16use actr_protocol::{ActorResult, ActrError, ActrId, DataStream, RpcEnvelope};
17use async_trait::async_trait;
18use bytes::Bytes;
19#[cfg(any(feature = "wasm-engine", feature = "dynclib-engine"))]
20use prost::Message;
21use std::future::Future;
22use std::pin::Pin;
23use std::sync::Arc;
24
25use crate::context::RuntimeContext;
26
27pub type InvocationContext = guest_abi::InvocationContextV1;
29
30#[derive(Debug)]
32pub enum HostOperation {
33 Call(HostCallV1),
34 Tell(HostTellV1),
35 Discover(HostDiscoverV1),
36 CallRaw(HostCallRawV1),
37 RegisterStream(HostRegisterStreamV1),
38 UnregisterStream(HostUnregisterStreamV1),
39 SendDataStream(HostSendDataStreamV1),
40}
41
42#[derive(Debug)]
44pub enum HostOperationResult {
45 Bytes(Vec<u8>),
46 Done,
47 Error(i32),
48}
49
50pub type HostAbiFn = Arc<
58 dyn Fn(HostOperation) -> Pin<Box<dyn Future<Output = HostOperationResult> + Send>>
59 + Send
60 + Sync,
61>;
62
63#[derive(Debug, Clone)]
70#[allow(dead_code)]
71pub(crate) enum PackageHookEvent {
72 SignalingConnecting,
73 SignalingConnected,
74 SignalingDisconnected,
75 WebSocketConnecting(PeerEvent),
76 WebSocketConnected(PeerEvent),
77 WebSocketDisconnected(PeerEvent),
78 WebRtcConnecting(PeerEvent),
79 WebRtcConnected(PeerEvent),
80 WebRtcDisconnected(PeerEvent),
81 CredentialRenewed(CredentialEvent),
82 CredentialExpiring(CredentialEvent),
83 MailboxBackpressure(BackpressureEvent),
84}
85
86impl PackageHookEvent {
87 pub(crate) fn request_id(&self) -> &'static str {
88 match self {
89 PackageHookEvent::SignalingConnecting => "hook:on_signaling_connecting",
90 PackageHookEvent::SignalingConnected => "hook:on_signaling_connected",
91 PackageHookEvent::SignalingDisconnected => "hook:on_signaling_disconnected",
92 PackageHookEvent::WebSocketConnecting(_) => "hook:on_websocket_connecting",
93 PackageHookEvent::WebSocketConnected(_) => "hook:on_websocket_connected",
94 PackageHookEvent::WebSocketDisconnected(_) => "hook:on_websocket_disconnected",
95 PackageHookEvent::WebRtcConnecting(_) => "hook:on_webrtc_connecting",
96 PackageHookEvent::WebRtcConnected(_) => "hook:on_webrtc_connected",
97 PackageHookEvent::WebRtcDisconnected(_) => "hook:on_webrtc_disconnected",
98 PackageHookEvent::CredentialRenewed(_) => "hook:on_credential_renewed",
99 PackageHookEvent::CredentialExpiring(_) => "hook:on_credential_expiring",
100 PackageHookEvent::MailboxBackpressure(_) => "hook:on_mailbox_backpressure",
101 }
102 }
103}
104
105#[async_trait]
128#[allow(dead_code)]
129pub(crate) trait LinkedWorkloadHandle: Send + Sync + 'static {
130 async fn on_start(&self, _ctx: &RuntimeContext) -> ActorResult<()> {
133 Ok(())
134 }
135 async fn on_ready(&self, _ctx: &RuntimeContext) -> ActorResult<()> {
136 Ok(())
137 }
138 async fn on_stop(&self, _ctx: &RuntimeContext) -> ActorResult<()> {
139 Ok(())
140 }
141 async fn on_error(&self, _ctx: &RuntimeContext, _event: &ErrorEvent) -> ActorResult<()> {
142 Ok(())
143 }
144
145 async fn on_signaling_connecting(&self, _ctx: Option<&RuntimeContext>) {}
147 async fn on_signaling_connected(&self, _ctx: Option<&RuntimeContext>) {}
148 async fn on_signaling_disconnected(&self, _ctx: &RuntimeContext) {}
149
150 async fn on_websocket_connecting(&self, _ctx: &RuntimeContext, _event: &PeerEvent) {}
152 async fn on_websocket_connected(&self, _ctx: &RuntimeContext, _event: &PeerEvent) {}
153 async fn on_websocket_disconnected(&self, _ctx: &RuntimeContext, _event: &PeerEvent) {}
154
155 async fn on_webrtc_connecting(&self, _ctx: &RuntimeContext, _event: &PeerEvent) {}
157 async fn on_webrtc_connected(&self, _ctx: &RuntimeContext, _event: &PeerEvent) {}
158 async fn on_webrtc_disconnected(&self, _ctx: &RuntimeContext, _event: &PeerEvent) {}
159
160 async fn on_credential_renewed(&self, _ctx: &RuntimeContext, _event: &CredentialEvent) {}
162 async fn on_credential_expiring(&self, _ctx: &RuntimeContext, _event: &CredentialEvent) {}
163
164 async fn on_mailbox_backpressure(&self, _ctx: &RuntimeContext, _event: &BackpressureEvent) {}
166
167 async fn dispatch(
176 &self,
177 _envelope: RpcEnvelope,
178 _ctx: Arc<RuntimeContext>,
179 ) -> ActorResult<Bytes> {
180 Err(ActrError::NotImplemented(
181 "linked workload handle has no dispatcher bound".to_string(),
182 ))
183 }
184}
185
186pub(crate) struct WorkloadAdapter<W: FrameworkWorkload> {
198 inner: Arc<W>,
199}
200
201impl<W: FrameworkWorkload> WorkloadAdapter<W> {
202 pub fn new(workload: W) -> Arc<Self> {
206 Arc::new(Self {
207 inner: Arc::new(workload),
208 })
209 }
210
211 pub async fn dispatch_with_ctx<C: actr_framework::Context>(
221 &self,
222 envelope: RpcEnvelope,
223 ctx: &C,
224 ) -> ActorResult<Bytes> {
225 <W::Dispatcher as MessageDispatcher>::dispatch(self.inner.as_ref(), envelope, ctx).await
226 }
227}
228
229#[async_trait]
230impl<W: FrameworkWorkload> LinkedWorkloadHandle for WorkloadAdapter<W> {
231 async fn on_start(&self, ctx: &RuntimeContext) -> ActorResult<()> {
233 self.inner.on_start(ctx).await
234 }
235 async fn on_ready(&self, ctx: &RuntimeContext) -> ActorResult<()> {
236 self.inner.on_ready(ctx).await
237 }
238 async fn on_stop(&self, ctx: &RuntimeContext) -> ActorResult<()> {
239 self.inner.on_stop(ctx).await
240 }
241 async fn on_error(&self, ctx: &RuntimeContext, event: &ErrorEvent) -> ActorResult<()> {
242 self.inner.on_error(ctx, event).await
243 }
244
245 async fn on_signaling_connecting(&self, ctx: Option<&RuntimeContext>) {
247 self.inner.on_signaling_connecting(ctx).await
248 }
249 async fn on_signaling_connected(&self, ctx: Option<&RuntimeContext>) {
250 self.inner.on_signaling_connected(ctx).await
251 }
252 async fn on_signaling_disconnected(&self, ctx: &RuntimeContext) {
253 self.inner.on_signaling_disconnected(ctx).await
254 }
255
256 async fn on_websocket_connecting(&self, ctx: &RuntimeContext, event: &PeerEvent) {
258 self.inner.on_websocket_connecting(ctx, event).await
259 }
260 async fn on_websocket_connected(&self, ctx: &RuntimeContext, event: &PeerEvent) {
261 self.inner.on_websocket_connected(ctx, event).await
262 }
263 async fn on_websocket_disconnected(&self, ctx: &RuntimeContext, event: &PeerEvent) {
264 self.inner.on_websocket_disconnected(ctx, event).await
265 }
266
267 async fn on_webrtc_connecting(&self, ctx: &RuntimeContext, event: &PeerEvent) {
269 self.inner.on_webrtc_connecting(ctx, event).await
270 }
271 async fn on_webrtc_connected(&self, ctx: &RuntimeContext, event: &PeerEvent) {
272 self.inner.on_webrtc_connected(ctx, event).await
273 }
274 async fn on_webrtc_disconnected(&self, ctx: &RuntimeContext, event: &PeerEvent) {
275 self.inner.on_webrtc_disconnected(ctx, event).await
276 }
277
278 async fn on_credential_renewed(&self, ctx: &RuntimeContext, event: &CredentialEvent) {
280 self.inner.on_credential_renewed(ctx, event).await
281 }
282 async fn on_credential_expiring(&self, ctx: &RuntimeContext, event: &CredentialEvent) {
283 self.inner.on_credential_expiring(ctx, event).await
284 }
285
286 async fn on_mailbox_backpressure(&self, ctx: &RuntimeContext, event: &BackpressureEvent) {
288 self.inner.on_mailbox_backpressure(ctx, event).await
289 }
290
291 async fn dispatch(
293 &self,
294 envelope: RpcEnvelope,
295 ctx: Arc<RuntimeContext>,
296 ) -> ActorResult<Bytes> {
297 self.dispatch_with_ctx(envelope, ctx.as_ref()).await
298 }
299}
300
301pub(crate) struct LinkedHandleObserver {
306 pub(crate) handle: Arc<dyn LinkedWorkloadHandle>,
307}
308
309#[async_trait]
310impl crate::lifecycle::hooks::WorkloadHookObserver for LinkedHandleObserver {
311 async fn on_start(&self, ctx: &RuntimeContext) -> ActorResult<()> {
312 self.handle.on_start(ctx).await
313 }
314 async fn on_ready(&self, ctx: &RuntimeContext) -> ActorResult<()> {
315 self.handle.on_ready(ctx).await
316 }
317 async fn on_stop(&self, ctx: &RuntimeContext) -> ActorResult<()> {
318 self.handle.on_stop(ctx).await
319 }
320 async fn on_error(&self, ctx: &RuntimeContext, event: &ErrorEvent) -> ActorResult<()> {
321 self.handle.on_error(ctx, event).await
322 }
323 async fn on_signaling_connecting(&self, ctx: Option<&RuntimeContext>) {
324 self.handle.on_signaling_connecting(ctx).await
325 }
326 async fn on_signaling_connected(&self, ctx: Option<&RuntimeContext>) {
327 self.handle.on_signaling_connected(ctx).await
328 }
329 async fn on_signaling_disconnected(&self, ctx: &RuntimeContext) {
330 self.handle.on_signaling_disconnected(ctx).await
331 }
332 async fn on_websocket_connecting(&self, ctx: &RuntimeContext, event: &PeerEvent) {
333 self.handle.on_websocket_connecting(ctx, event).await
334 }
335 async fn on_websocket_connected(&self, ctx: &RuntimeContext, event: &PeerEvent) {
336 self.handle.on_websocket_connected(ctx, event).await
337 }
338 async fn on_websocket_disconnected(&self, ctx: &RuntimeContext, event: &PeerEvent) {
339 self.handle.on_websocket_disconnected(ctx, event).await
340 }
341 async fn on_webrtc_connecting(&self, ctx: &RuntimeContext, event: &PeerEvent) {
342 self.handle.on_webrtc_connecting(ctx, event).await
343 }
344 async fn on_webrtc_connected(&self, ctx: &RuntimeContext, event: &PeerEvent) {
345 self.handle.on_webrtc_connected(ctx, event).await
346 }
347 async fn on_webrtc_disconnected(&self, ctx: &RuntimeContext, event: &PeerEvent) {
348 self.handle.on_webrtc_disconnected(ctx, event).await
349 }
350 async fn on_credential_renewed(&self, ctx: &RuntimeContext, event: &CredentialEvent) {
351 self.handle.on_credential_renewed(ctx, event).await
352 }
353 async fn on_credential_expiring(&self, ctx: &RuntimeContext, event: &CredentialEvent) {
354 self.handle.on_credential_expiring(ctx, event).await
355 }
356 async fn on_mailbox_backpressure(&self, ctx: &RuntimeContext, event: &BackpressureEvent) {
357 self.handle.on_mailbox_backpressure(ctx, event).await
358 }
359}
360
361pub(crate) struct PackageHookObserver {
367 pub(crate) workload_dispatch: Arc<tokio::sync::Mutex<Workload>>,
368}
369
370impl PackageHookObserver {
371 async fn dispatch_hook(
372 &self,
373 label: &'static str,
374 ctx: &RuntimeContext,
375 event: PackageHookEvent,
376 ) {
377 use actr_framework::Context as _;
378
379 let invocation = InvocationContext {
380 self_id: ctx.self_id().clone(),
381 caller_id: None,
382 request_id: event.request_id().to_string(),
383 };
384 let call_executor =
385 crate::lifecycle::node::lifecycle_host_abi(ctx.clone(), self.workload_dispatch.clone());
386 let mut workload = self.workload_dispatch.lock().await;
387 if let Err(e) = workload
388 .dispatch_hook_event(event, invocation, &call_executor)
389 .await
390 {
391 tracing::warn!(hook = label, error = %e, "workload package hook returned Err");
392 }
393 }
394}
395
396#[async_trait]
397impl crate::lifecycle::hooks::WorkloadHookObserver for PackageHookObserver {
398 async fn on_signaling_connecting(&self, ctx: Option<&RuntimeContext>) {
399 if let Some(ctx) = ctx {
400 self.dispatch_hook(
401 "on_signaling_connecting",
402 ctx,
403 PackageHookEvent::SignalingConnecting,
404 )
405 .await;
406 }
407 }
408
409 async fn on_signaling_connected(&self, ctx: Option<&RuntimeContext>) {
410 if let Some(ctx) = ctx {
411 self.dispatch_hook(
412 "on_signaling_connected",
413 ctx,
414 PackageHookEvent::SignalingConnected,
415 )
416 .await;
417 }
418 }
419
420 async fn on_signaling_disconnected(&self, ctx: &RuntimeContext) {
421 self.dispatch_hook(
422 "on_signaling_disconnected",
423 ctx,
424 PackageHookEvent::SignalingDisconnected,
425 )
426 .await;
427 }
428
429 async fn on_websocket_connecting(&self, ctx: &RuntimeContext, event: &PeerEvent) {
430 self.dispatch_hook(
431 "on_websocket_connecting",
432 ctx,
433 PackageHookEvent::WebSocketConnecting(event.clone()),
434 )
435 .await;
436 }
437
438 async fn on_websocket_connected(&self, ctx: &RuntimeContext, event: &PeerEvent) {
439 self.dispatch_hook(
440 "on_websocket_connected",
441 ctx,
442 PackageHookEvent::WebSocketConnected(event.clone()),
443 )
444 .await;
445 }
446
447 async fn on_websocket_disconnected(&self, ctx: &RuntimeContext, event: &PeerEvent) {
448 self.dispatch_hook(
449 "on_websocket_disconnected",
450 ctx,
451 PackageHookEvent::WebSocketDisconnected(event.clone()),
452 )
453 .await;
454 }
455
456 async fn on_webrtc_connecting(&self, ctx: &RuntimeContext, event: &PeerEvent) {
457 self.dispatch_hook(
458 "on_webrtc_connecting",
459 ctx,
460 PackageHookEvent::WebRtcConnecting(event.clone()),
461 )
462 .await;
463 }
464
465 async fn on_webrtc_connected(&self, ctx: &RuntimeContext, event: &PeerEvent) {
466 self.dispatch_hook(
467 "on_webrtc_connected",
468 ctx,
469 PackageHookEvent::WebRtcConnected(event.clone()),
470 )
471 .await;
472 }
473
474 async fn on_webrtc_disconnected(&self, ctx: &RuntimeContext, event: &PeerEvent) {
475 self.dispatch_hook(
476 "on_webrtc_disconnected",
477 ctx,
478 PackageHookEvent::WebRtcDisconnected(event.clone()),
479 )
480 .await;
481 }
482
483 async fn on_credential_renewed(&self, ctx: &RuntimeContext, event: &CredentialEvent) {
484 self.dispatch_hook(
485 "on_credential_renewed",
486 ctx,
487 PackageHookEvent::CredentialRenewed(event.clone()),
488 )
489 .await;
490 }
491
492 async fn on_credential_expiring(&self, ctx: &RuntimeContext, event: &CredentialEvent) {
493 self.dispatch_hook(
494 "on_credential_expiring",
495 ctx,
496 PackageHookEvent::CredentialExpiring(event.clone()),
497 )
498 .await;
499 }
500
501 async fn on_mailbox_backpressure(&self, ctx: &RuntimeContext, event: &BackpressureEvent) {
502 self.dispatch_hook(
503 "on_mailbox_backpressure",
504 ctx,
505 PackageHookEvent::MailboxBackpressure(*event),
506 )
507 .await;
508 }
509}
510
511#[allow(clippy::large_enum_variant)]
522pub(crate) enum Workload {
523 Linked(Arc<dyn LinkedWorkloadHandle>),
526 #[cfg(feature = "wasm-engine")]
527 Wasm(crate::wasm::WasmWorkload),
528 #[cfg(feature = "dynclib-engine")]
529 DynClib(crate::dynclib::DynClibWorkload),
530}
531
532impl std::fmt::Debug for Workload {
533 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
534 match self {
535 Workload::Linked(_) => f.write_str("Workload::Linked(<dyn LinkedWorkloadHandle>)"),
536 #[cfg(feature = "wasm-engine")]
537 Workload::Wasm(w) => f.debug_tuple("Workload::Wasm").field(w).finish(),
538 #[cfg(feature = "dynclib-engine")]
539 Workload::DynClib(w) => f.debug_tuple("Workload::DynClib").field(w).finish(),
540 }
541 }
542}
543
544impl Workload {
545 pub(crate) fn on_start<'a>(
547 &'a mut self,
548 ctx: RuntimeContext,
549 invocation: InvocationContext,
550 host_abi: &'a HostAbiFn,
551 ) -> Pin<Box<dyn Future<Output = ActorResult<()>> + Send + 'a>> {
552 Box::pin(async move {
553 let _ = (&invocation, host_abi);
554 match self {
555 Workload::Linked(handle) => handle.on_start(&ctx).await,
556 #[cfg(feature = "wasm-engine")]
557 Workload::Wasm(workload) => workload
558 .call_on_start(invocation, host_abi)
559 .await
560 .map_err(|e| ActrError::Internal(format!("workload on_start failed: {e}"))),
561 #[cfg(feature = "dynclib-engine")]
562 Workload::DynClib(workload) => workload
563 .call_on_start(invocation, host_abi)
564 .await
565 .map_err(|e| ActrError::Internal(format!("workload on_start failed: {e}"))),
566 }
567 })
568 }
569
570 pub(crate) fn on_ready<'a>(
572 &'a mut self,
573 ctx: RuntimeContext,
574 invocation: InvocationContext,
575 host_abi: &'a HostAbiFn,
576 ) -> Pin<Box<dyn Future<Output = ActorResult<()>> + Send + 'a>> {
577 Box::pin(async move {
578 let _ = (&invocation, host_abi);
579 match self {
580 Workload::Linked(handle) => handle.on_ready(&ctx).await,
581 #[cfg(feature = "wasm-engine")]
582 Workload::Wasm(workload) => workload
583 .call_on_ready(invocation, host_abi)
584 .await
585 .map_err(|e| ActrError::Internal(format!("workload on_ready failed: {e}"))),
586 #[cfg(feature = "dynclib-engine")]
587 Workload::DynClib(workload) => workload
588 .call_on_ready(invocation, host_abi)
589 .await
590 .map_err(|e| ActrError::Internal(format!("workload on_ready failed: {e}"))),
591 }
592 })
593 }
594
595 pub(crate) fn on_stop<'a>(
597 &'a mut self,
598 ctx: RuntimeContext,
599 invocation: InvocationContext,
600 host_abi: &'a HostAbiFn,
601 ) -> Pin<Box<dyn Future<Output = ActorResult<()>> + Send + 'a>> {
602 Box::pin(async move {
603 let _ = (&invocation, host_abi);
604 match self {
605 Workload::Linked(handle) => handle.on_stop(&ctx).await,
606 #[cfg(feature = "wasm-engine")]
607 Workload::Wasm(workload) => workload
608 .call_on_stop(invocation, host_abi)
609 .await
610 .map_err(|e| ActrError::Internal(format!("workload on_stop failed: {e}"))),
611 #[cfg(feature = "dynclib-engine")]
612 Workload::DynClib(workload) => workload
613 .call_on_stop(invocation, host_abi)
614 .await
615 .map_err(|e| ActrError::Internal(format!("workload on_stop failed: {e}"))),
616 }
617 })
618 }
619
620 pub(crate) fn dispatch_envelope<'a>(
622 &'a mut self,
623 envelope: RpcEnvelope,
624 ctx: crate::context::RuntimeContext,
625 invocation: InvocationContext,
626 _host_abi: &'a HostAbiFn,
627 ) -> Pin<Box<dyn Future<Output = ActorResult<Bytes>> + Send + 'a>> {
628 Box::pin(async move {
629 let _ = &invocation;
630 match self {
631 Workload::Linked(handle) => handle.dispatch(envelope, Arc::new(ctx)).await,
632 #[cfg(feature = "wasm-engine")]
633 Workload::Wasm(workload) => {
634 let request_bytes = envelope.encode_to_vec();
635 workload
636 .handle(&request_bytes, invocation, _host_abi)
637 .await
638 .map(Bytes::from)
639 .map_err(|e| ActrError::Internal(format!("workload dispatch failed: {e}")))
640 }
641 #[cfg(feature = "dynclib-engine")]
642 Workload::DynClib(workload) => {
643 let request_bytes = envelope.encode_to_vec();
644 workload
645 .handle(&request_bytes, invocation, _host_abi)
646 .await
647 .map(Bytes::from)
648 .map_err(|e| ActrError::Internal(format!("workload dispatch failed: {e}")))
649 }
650 }
651 })
652 }
653
654 pub(crate) fn dispatch_data_stream<'a>(
655 &'a mut self,
656 chunk: DataStream,
657 sender: ActrId,
658 invocation: InvocationContext,
659 host_abi: &'a HostAbiFn,
660 ) -> Pin<Box<dyn Future<Output = ActorResult<()>> + Send + 'a>> {
661 Box::pin(async move {
662 let _ = (&chunk, &sender, host_abi);
663 let _ = &invocation;
664 match self {
665 Workload::Linked(_) => {
666 let _ = (&chunk, &sender, host_abi);
667 Err(ActrError::NotImplemented(
668 "linked workload stream callbacks are registered directly on RuntimeContext"
669 .to_string(),
670 ))
671 }
672 #[cfg(feature = "wasm-engine")]
673 Workload::Wasm(workload) => workload
674 .handle_data_stream(chunk, sender, invocation, host_abi)
675 .await
676 .map_err(|e| {
677 ActrError::Internal(format!("workload stream dispatch failed: {e}"))
678 }),
679 #[cfg(feature = "dynclib-engine")]
680 Workload::DynClib(workload) => workload
681 .handle_data_stream(chunk, sender, host_abi)
682 .await
683 .map_err(|e| {
684 ActrError::Internal(format!("workload stream dispatch failed: {e}"))
685 }),
686 }
687 })
688 }
689
690 pub(crate) fn dispatch_hook_event<'a>(
696 &'a mut self,
697 event: PackageHookEvent,
698 invocation: InvocationContext,
699 host_abi: &'a HostAbiFn,
700 ) -> Pin<Box<dyn Future<Output = ActorResult<()>> + Send + 'a>> {
701 Box::pin(async move {
702 let _ = (&event, &invocation, host_abi);
703 match self {
704 Workload::Linked(_) => Ok(()),
705 #[cfg(feature = "wasm-engine")]
706 Workload::Wasm(workload) => workload
707 .call_hook_event(event, invocation, host_abi)
708 .await
709 .map_err(|e| ActrError::Internal(format!("workload hook failed: {e}"))),
710 #[cfg(feature = "dynclib-engine")]
711 Workload::DynClib(workload) => workload
712 .call_hook_event(event, invocation, host_abi)
713 .await
714 .map_err(|e| ActrError::Internal(format!("workload hook failed: {e}"))),
715 }
716 })
717 }
718}
719
720#[cfg(feature = "dynclib-engine")]
728pub(crate) fn decode_host_operation(frame: guest_abi::AbiFrame) -> Result<HostOperation, i32> {
729 if frame.abi_version != guest_abi::version::V1 {
730 return Err(guest_abi::code::PROTOCOL_ERROR);
731 }
732
733 match frame.op {
734 guest_abi::op::HOST_CALL => {
735 let payload = <HostCallV1 as AbiPayload>::decode_payload(&frame.payload)?;
736 Ok(HostOperation::Call(payload))
737 }
738 guest_abi::op::HOST_TELL => {
739 let payload = <HostTellV1 as AbiPayload>::decode_payload(&frame.payload)?;
740 Ok(HostOperation::Tell(payload))
741 }
742 guest_abi::op::HOST_CALL_RAW => {
743 let payload = <HostCallRawV1 as AbiPayload>::decode_payload(&frame.payload)?;
744 Ok(HostOperation::CallRaw(payload))
745 }
746 guest_abi::op::HOST_DISCOVER => {
747 let payload = <HostDiscoverV1 as AbiPayload>::decode_payload(&frame.payload)?;
748 Ok(HostOperation::Discover(payload))
749 }
750 guest_abi::op::HOST_REGISTER_STREAM => {
751 let payload = <HostRegisterStreamV1 as AbiPayload>::decode_payload(&frame.payload)?;
752 Ok(HostOperation::RegisterStream(payload))
753 }
754 guest_abi::op::HOST_UNREGISTER_STREAM => {
755 let payload = <HostUnregisterStreamV1 as AbiPayload>::decode_payload(&frame.payload)?;
756 Ok(HostOperation::UnregisterStream(payload))
757 }
758 guest_abi::op::HOST_SEND_DATA_STREAM => {
759 let payload = <HostSendDataStreamV1 as AbiPayload>::decode_payload(&frame.payload)?;
760 Ok(HostOperation::SendDataStream(payload))
761 }
762 _ => Err(guest_abi::code::UNSUPPORTED_OP),
763 }
764}
765
766#[cfg(feature = "dynclib-engine")]
768pub(crate) fn encode_guest_handle_request(
769 request_bytes: &[u8],
770 ctx: InvocationContext,
771) -> Result<Vec<u8>, i32> {
772 let request = GuestHandleV1 {
773 ctx,
774 rpc_envelope: request_bytes.to_vec(),
775 };
776 let frame = request.to_frame()?;
777 guest_abi::encode_message(&frame)
778}
779
780#[cfg(feature = "dynclib-engine")]
781pub(crate) fn encode_guest_data_stream_request(
782 chunk: DataStream,
783 sender: ActrId,
784) -> Result<Vec<u8>, i32> {
785 let request = guest_abi::GuestDataStreamV1 { chunk, sender };
786 let frame = request.to_frame()?;
787 guest_abi::encode_message(&frame)
788}
789
790#[cfg(feature = "dynclib-engine")]
792pub(crate) fn encode_guest_lifecycle_request(
793 hook: u32,
794 ctx: InvocationContext,
795) -> Result<Vec<u8>, i32> {
796 let request = guest_abi::GuestLifecycleV1 { ctx, hook };
797 let frame = request.to_frame()?;
798 guest_abi::encode_message(&frame)
799}
800
801#[cfg(feature = "dynclib-engine")]
802fn timestamp_to_v1(time: std::time::SystemTime) -> guest_abi::TimestampV1 {
803 let duration = time
804 .duration_since(std::time::UNIX_EPOCH)
805 .unwrap_or_default();
806 guest_abi::TimestampV1 {
807 seconds: duration.as_secs(),
808 nanoseconds: duration.subsec_nanos(),
809 }
810}
811
812#[cfg(feature = "dynclib-engine")]
813fn peer_event_to_v1(event: PeerEvent) -> guest_abi::PeerEventV1 {
814 guest_abi::PeerEventV1 {
815 peer: event.peer,
816 relayed: event.relayed,
817 }
818}
819
820#[cfg(feature = "dynclib-engine")]
821fn credential_event_to_v1(event: CredentialEvent) -> guest_abi::CredentialEventV1 {
822 guest_abi::CredentialEventV1 {
823 new_expiry: timestamp_to_v1(event.new_expiry),
824 }
825}
826
827#[cfg(feature = "dynclib-engine")]
828fn backpressure_event_to_v1(event: BackpressureEvent) -> guest_abi::BackpressureEventV1 {
829 guest_abi::BackpressureEventV1 {
830 queue_len: event.queue_len as u64,
831 threshold: event.threshold as u64,
832 }
833}
834
835#[cfg(feature = "dynclib-engine")]
837pub(crate) fn encode_guest_hook_request(
838 event: PackageHookEvent,
839 ctx: InvocationContext,
840) -> Result<Vec<u8>, i32> {
841 let mut request = GuestHookV1 {
842 ctx,
843 hook: 0,
844 peer: None,
845 credential: None,
846 backpressure: None,
847 };
848
849 match event {
850 PackageHookEvent::SignalingConnecting => {
851 request.hook = guest_abi::runtime_hook::ON_SIGNALING_CONNECTING;
852 }
853 PackageHookEvent::SignalingConnected => {
854 request.hook = guest_abi::runtime_hook::ON_SIGNALING_CONNECTED;
855 }
856 PackageHookEvent::SignalingDisconnected => {
857 request.hook = guest_abi::runtime_hook::ON_SIGNALING_DISCONNECTED;
858 }
859 PackageHookEvent::WebSocketConnecting(event) => {
860 request.hook = guest_abi::runtime_hook::ON_WEBSOCKET_CONNECTING;
861 request.peer = Some(peer_event_to_v1(event));
862 }
863 PackageHookEvent::WebSocketConnected(event) => {
864 request.hook = guest_abi::runtime_hook::ON_WEBSOCKET_CONNECTED;
865 request.peer = Some(peer_event_to_v1(event));
866 }
867 PackageHookEvent::WebSocketDisconnected(event) => {
868 request.hook = guest_abi::runtime_hook::ON_WEBSOCKET_DISCONNECTED;
869 request.peer = Some(peer_event_to_v1(event));
870 }
871 PackageHookEvent::WebRtcConnecting(event) => {
872 request.hook = guest_abi::runtime_hook::ON_WEBRTC_CONNECTING;
873 request.peer = Some(peer_event_to_v1(event));
874 }
875 PackageHookEvent::WebRtcConnected(event) => {
876 request.hook = guest_abi::runtime_hook::ON_WEBRTC_CONNECTED;
877 request.peer = Some(peer_event_to_v1(event));
878 }
879 PackageHookEvent::WebRtcDisconnected(event) => {
880 request.hook = guest_abi::runtime_hook::ON_WEBRTC_DISCONNECTED;
881 request.peer = Some(peer_event_to_v1(event));
882 }
883 PackageHookEvent::CredentialRenewed(event) => {
884 request.hook = guest_abi::runtime_hook::ON_CREDENTIAL_RENEWED;
885 request.credential = Some(credential_event_to_v1(event));
886 }
887 PackageHookEvent::CredentialExpiring(event) => {
888 request.hook = guest_abi::runtime_hook::ON_CREDENTIAL_EXPIRING;
889 request.credential = Some(credential_event_to_v1(event));
890 }
891 PackageHookEvent::MailboxBackpressure(event) => {
892 request.hook = guest_abi::runtime_hook::ON_MAILBOX_BACKPRESSURE;
893 request.backpressure = Some(backpressure_event_to_v1(event));
894 }
895 }
896
897 let frame = request.to_frame()?;
898 guest_abi::encode_message(&frame)
899}
900
901pub(crate) fn decode_dest(
905 v1: &actr_framework::guest::dynclib_abi::DestV1,
906) -> Option<actr_framework::Dest> {
907 actr_framework::guest::dynclib_abi::dest_v1_to_dest(v1)
908}
909
910#[cfg(test)]
911mod tests {
912 use super::*;
913 use crate::inbound::{DataStreamRegistry, MediaFrameRegistry};
914 use crate::lifecycle::hooks::{
915 HookContextBuilder, WorkloadHookObserverRef, build_hook_callback,
916 };
917 use crate::outbound::{Gate, HostGate};
918 use crate::transport::HostTransport;
919 use crate::wire::webrtc::{
920 HookEvent, ReconnectConfig, SignalingClient, SignalingConfig, WebSocketSignalingClient,
921 };
922 use actr_framework::Context as FrameworkContext;
923 use actr_framework::test_support::DummyContext;
924 use actr_protocol::{AIdCredential, ActrId, ActrType, Realm};
925 use tokio::sync::mpsc;
926
927 fn make_id(serial: u64) -> ActrId {
928 ActrId {
929 realm: Realm { realm_id: 1 },
930 serial_number: serial,
931 r#type: ActrType {
932 manufacturer: "test".to_string(),
933 name: "UnitTestActor".to_string(),
934 version: "0.0.1".to_string(),
935 },
936 }
937 }
938
939 fn test_credential() -> AIdCredential {
940 AIdCredential {
941 key_id: 1,
942 claims: bytes::Bytes::from_static(b"claims"),
943 signature: bytes::Bytes::from(vec![0; 64]),
944 }
945 }
946
947 fn test_runtime_context(serial: u64) -> RuntimeContext {
948 let host_transport = Arc::new(HostTransport::new());
949 let inproc_gate = Gate::Host(Arc::new(HostGate::new(host_transport)));
950 let signaling_client: Arc<dyn SignalingClient> =
951 Arc::new(WebSocketSignalingClient::new(SignalingConfig {
952 server_url: url::Url::parse("ws://127.0.0.1:9").expect("valid test URL"),
953 connection_timeout: 1,
954 heartbeat_interval: 30,
955 reconnect_config: ReconnectConfig::default(),
956 auth_config: None,
957 webrtc_role: None,
958 }));
959
960 RuntimeContext::new(
961 make_id(serial),
962 None,
963 "workload-test".to_string(),
964 inproc_gate,
965 None,
966 Arc::new(DataStreamRegistry::new()),
967 Arc::new(MediaFrameRegistry::new()),
968 signaling_client,
969 test_credential(),
970 None,
971 )
972 }
973
974 struct EchoWorkload {
976 suffix: String,
977 }
978
979 #[async_trait]
980 impl FrameworkWorkload for EchoWorkload {
981 type Dispatcher = EchoDispatcher;
982 }
983
984 struct EchoDispatcher;
985
986 #[async_trait]
987 impl MessageDispatcher for EchoDispatcher {
988 type Workload = EchoWorkload;
989
990 async fn dispatch<C: FrameworkContext>(
991 workload: &Self::Workload,
992 envelope: RpcEnvelope,
993 _ctx: &C,
994 ) -> ActorResult<Bytes> {
995 match envelope.route_key.as_str() {
996 "echo" => {
997 let payload = envelope
998 .payload
999 .as_ref()
1000 .map(|b| String::from_utf8_lossy(b).to_string())
1001 .unwrap_or_default();
1002 let reply = format!("{payload}{}", workload.suffix);
1003 Ok(Bytes::from(reply.into_bytes()))
1004 }
1005 other => Err(ActrError::InvalidArgument(format!(
1006 "unknown route: {other}"
1007 ))),
1008 }
1009 }
1010 }
1011
1012 struct LifecycleFailingWorkload;
1013
1014 #[async_trait]
1015 impl FrameworkWorkload for LifecycleFailingWorkload {
1016 type Dispatcher = LifecycleFailingDispatcher;
1017
1018 async fn on_start<C: FrameworkContext>(&self, _ctx: &C) -> ActorResult<()> {
1019 Err(ActrError::Internal("on_start failed".to_string()))
1020 }
1021 }
1022
1023 struct LifecycleFailingDispatcher;
1024
1025 #[async_trait]
1026 impl MessageDispatcher for LifecycleFailingDispatcher {
1027 type Workload = LifecycleFailingWorkload;
1028
1029 async fn dispatch<C: FrameworkContext>(
1030 _workload: &Self::Workload,
1031 _envelope: RpcEnvelope,
1032 _ctx: &C,
1033 ) -> ActorResult<Bytes> {
1034 Ok(Bytes::new())
1035 }
1036 }
1037
1038 struct RecordingWorkload {
1039 tx: mpsc::UnboundedSender<String>,
1040 }
1041
1042 #[async_trait]
1043 impl FrameworkWorkload for RecordingWorkload {
1044 type Dispatcher = RecordingDispatcher;
1045
1046 async fn on_ready<C: FrameworkContext>(&self, ctx: &C) -> ActorResult<()> {
1047 let _ = self
1048 .tx
1049 .send(format!("on_ready:self={}", ctx.self_id().serial_number));
1050 Ok(())
1051 }
1052
1053 async fn on_stop<C: FrameworkContext>(&self, ctx: &C) -> ActorResult<()> {
1054 let _ = self
1055 .tx
1056 .send(format!("on_stop:self={}", ctx.self_id().serial_number));
1057 Ok(())
1058 }
1059
1060 async fn on_websocket_connected<C: FrameworkContext>(&self, ctx: &C, event: &PeerEvent) {
1061 let _ = self.tx.send(format!(
1062 "on_websocket_connected:self={}:peer={}:relayed={}",
1063 ctx.self_id().serial_number,
1064 event.peer.serial_number,
1065 match event.relayed {
1066 Some(true) => "true",
1067 Some(false) => "false",
1068 None => "none",
1069 }
1070 ));
1071 }
1072 }
1073
1074 struct RecordingDispatcher;
1075
1076 #[async_trait]
1077 impl MessageDispatcher for RecordingDispatcher {
1078 type Workload = RecordingWorkload;
1079
1080 async fn dispatch<C: FrameworkContext>(
1081 _workload: &Self::Workload,
1082 _envelope: RpcEnvelope,
1083 _ctx: &C,
1084 ) -> ActorResult<Bytes> {
1085 Ok(Bytes::new())
1086 }
1087 }
1088
1089 async fn expect_recorded(rx: &mut mpsc::UnboundedReceiver<String>, expected: &'static str) {
1090 let observed = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
1091 .await
1092 .expect("linked hook was not called")
1093 .expect("recording workload dropped");
1094 assert_eq!(observed, expected);
1095 }
1096
1097 #[tokio::test]
1098 async fn adapter_dispatch_routes_to_workload_dispatcher() {
1099 let adapter = WorkloadAdapter::new(EchoWorkload {
1100 suffix: "-ok".to_string(),
1101 });
1102 let ctx = DummyContext::new(make_id(42));
1103 let envelope = RpcEnvelope {
1104 request_id: "r1".to_string(),
1105 route_key: "echo".to_string(),
1106 payload: Some(Bytes::from_static(b"hello")),
1107 ..Default::default()
1108 };
1109 let resp = adapter
1110 .dispatch_with_ctx(envelope, &ctx)
1111 .await
1112 .expect("dispatch must succeed");
1113 assert_eq!(&resp[..], b"hello-ok");
1114 }
1115
1116 #[tokio::test]
1117 async fn adapter_dispatch_propagates_unknown_route_error() {
1118 let adapter = WorkloadAdapter::new(EchoWorkload {
1119 suffix: "-ok".to_string(),
1120 });
1121 let ctx = DummyContext::new(make_id(1));
1122 let envelope = RpcEnvelope {
1123 request_id: "r2".to_string(),
1124 route_key: "does/not/exist".to_string(),
1125 payload: Some(Bytes::new()),
1126 ..Default::default()
1127 };
1128 let err = adapter
1129 .dispatch_with_ctx(envelope, &ctx)
1130 .await
1131 .expect_err("unknown route must error");
1132 match err {
1133 ActrError::InvalidArgument(msg) => {
1134 assert!(msg.contains("unknown route"), "unexpected message: {msg}")
1135 }
1136 other => panic!("expected InvalidArgument, got {other:?}"),
1137 }
1138 }
1139
1140 #[tokio::test]
1141 async fn adapter_on_start_propagates_workload_error() {
1142 let adapter = WorkloadAdapter::new(LifecycleFailingWorkload);
1143 let ctx = test_runtime_context(7);
1144
1145 let err = adapter
1146 .on_start(&ctx)
1147 .await
1148 .expect_err("adapter must preserve lifecycle errors");
1149
1150 match err {
1151 ActrError::Internal(msg) => {
1152 assert!(msg.contains("on_start failed"), "unexpected message: {msg}");
1153 }
1154 other => panic!("expected Internal, got {other:?}"),
1155 }
1156 }
1157
1158 #[tokio::test]
1159 async fn workload_on_start_propagates_linked_error() {
1160 let handle: Arc<dyn LinkedWorkloadHandle> = WorkloadAdapter::new(LifecycleFailingWorkload);
1161 let mut workload = Workload::Linked(handle);
1162 let ctx = test_runtime_context(8);
1163 let invocation = InvocationContext {
1164 self_id: make_id(8),
1165 caller_id: None,
1166 request_id: "lifecycle:on_start".to_string(),
1167 };
1168 let host_abi: HostAbiFn = Arc::new(|_| Box::pin(async { HostOperationResult::Done }));
1169
1170 let err = workload
1171 .on_start(ctx, invocation, &host_abi)
1172 .await
1173 .expect_err("workload lifecycle must preserve linked errors");
1174
1175 match err {
1176 ActrError::Internal(msg) => {
1177 assert!(msg.contains("on_start failed"), "unexpected message: {msg}");
1178 }
1179 other => panic!("expected Internal, got {other:?}"),
1180 }
1181 }
1182
1183 #[tokio::test]
1184 async fn workload_on_ready_and_on_stop_reach_linked_workload() {
1185 let (tx, mut rx) = mpsc::unbounded_channel();
1186 let handle: Arc<dyn LinkedWorkloadHandle> = WorkloadAdapter::new(RecordingWorkload { tx });
1187 let mut workload = Workload::Linked(handle);
1188 let host_abi: HostAbiFn = Arc::new(|_| Box::pin(async { HostOperationResult::Done }));
1189
1190 workload
1191 .on_ready(
1192 test_runtime_context(9),
1193 InvocationContext {
1194 self_id: make_id(9),
1195 caller_id: None,
1196 request_id: "lifecycle:on_ready".to_string(),
1197 },
1198 &host_abi,
1199 )
1200 .await
1201 .expect("linked on_ready should dispatch");
1202 workload
1203 .on_stop(
1204 test_runtime_context(9),
1205 InvocationContext {
1206 self_id: make_id(9),
1207 caller_id: None,
1208 request_id: "lifecycle:on_stop".to_string(),
1209 },
1210 &host_abi,
1211 )
1212 .await
1213 .expect("linked on_stop should dispatch");
1214
1215 expect_recorded(&mut rx, "on_ready:self=9").await;
1216 expect_recorded(&mut rx, "on_stop:self=9").await;
1217 }
1218
1219 #[tokio::test]
1220 async fn hook_callback_reaches_linked_workload_once() {
1221 let (tx, mut rx) = mpsc::unbounded_channel();
1222 let handle: Arc<dyn LinkedWorkloadHandle> = WorkloadAdapter::new(RecordingWorkload { tx });
1223 let observer: WorkloadHookObserverRef = Arc::new(LinkedHandleObserver { handle });
1224 let ctx = test_runtime_context(10);
1225 let ctx_builder: HookContextBuilder = Arc::new(move || {
1226 let ctx = ctx.clone();
1227 Box::pin(async move { Some(ctx) })
1228 });
1229 let cb = build_hook_callback(Some(observer), ctx_builder);
1230
1231 cb(HookEvent::WebSocketConnected {
1232 peer_id: make_id(42),
1233 })
1234 .await;
1235
1236 expect_recorded(
1237 &mut rx,
1238 "on_websocket_connected:self=10:peer=42:relayed=none",
1239 )
1240 .await;
1241 tokio::task::yield_now().await;
1242 tokio::task::yield_now().await;
1243 assert!(
1244 rx.try_recv().is_err(),
1245 "linked workload should receive exactly one hook event"
1246 );
1247 }
1248
1249 #[test]
1253 fn linked_workload_handle_is_object_safe() {
1254 fn accepts(_: Arc<dyn LinkedWorkloadHandle>) {}
1255 let adapter: Arc<dyn LinkedWorkloadHandle> = WorkloadAdapter::new(EchoWorkload {
1256 suffix: "-ok".to_string(),
1257 });
1258 accepts(adapter);
1259 }
1260
1261 #[test]
1263 fn linked_workload_debug_surface() {
1264 let handle: Arc<dyn LinkedWorkloadHandle> = WorkloadAdapter::new(EchoWorkload {
1265 suffix: "-ok".to_string(),
1266 });
1267 let linked = Workload::Linked(handle);
1268 assert!(format!("{:?}", linked).starts_with("Workload::Linked"));
1269 }
1270}