Skip to main content

actr_hyper/
workload.rs

1//! Workload runtime abstractions for guest backends.
2//!
3//! This module replaces the old executor adapter layer. `ActrNode` dispatches
4//! directly into a runtime `Workload` enum.
5
6use actr_framework::guest::dynclib_abi::{
7    self as guest_abi, HostCallRawV1, HostCallV1, HostDiscoverV1, HostRegisterStreamV1,
8    HostSendDataStreamV1, HostTellV1, HostUnregisterStreamV1,
9};
10#[cfg(feature = "dynclib-engine")]
11use actr_framework::guest::dynclib_abi::{AbiPayload, GuestHandleV1, GuestHookV1};
12use actr_framework::{
13    BackpressureEvent, CredentialEvent, ErrorEvent, MessageDispatcher, PeerEvent,
14    Workload as FrameworkWorkload,
15};
16use actr_protocol::{ActorResult, ActrError, ActrId, DataStream, RpcEnvelope};
17use async_trait::async_trait;
18use bytes::Bytes;
19#[cfg(any(feature = "wasm-engine", feature = "dynclib-engine"))]
20use prost::Message;
21use std::future::Future;
22use std::pin::Pin;
23use std::sync::Arc;
24
25use crate::context::RuntimeContext;
26
27/// ABI-stable invocation context passed into guest runtime on each request.
28pub type InvocationContext = guest_abi::InvocationContextV1;
29
30/// Guest-initiated host operation carrying strong-typed ABI payloads.
31#[derive(Debug)]
32pub enum HostOperation {
33    Call(HostCallV1),
34    Tell(HostTellV1),
35    Discover(HostDiscoverV1),
36    CallRaw(HostCallRawV1),
37    RegisterStream(HostRegisterStreamV1),
38    UnregisterStream(HostUnregisterStreamV1),
39    SendDataStream(HostSendDataStreamV1),
40}
41
42/// Result of a host operation.
43#[derive(Debug)]
44pub enum HostOperationResult {
45    Bytes(Vec<u8>),
46    Done,
47    Error(i32),
48}
49
50/// Host-side async bridge used by guest runtimes.
51///
52/// Passed as `&HostAbiFn` through the dispatch path. Wrapped in an `Arc`
53/// (rather than the historical `Box`) so that the wasm Component Model
54/// host can clone the bridge into its `Store<HostState>` for the
55/// duration of a dispatch without forcing every call site to rebox —
56/// cloning an `Arc` is a refcount bump, safe to do per dispatch.
57pub type HostAbiFn = Arc<
58    dyn Fn(HostOperation) -> Pin<Box<dyn Future<Output = HostOperationResult> + Send>>
59        + Send
60        + Sync,
61>;
62
63/// Package-backed observation hook event.
64///
65/// Linked workloads receive observation hooks through [`LinkedHandleObserver`].
66/// Package-backed observers lower hook callbacks into this enum and serialize
67/// them through [`Workload::dispatch_hook_event`], which enters the Wasm /
68/// DynClib guest ABI.
69#[derive(Debug, Clone)]
70#[allow(dead_code)]
71pub(crate) enum PackageHookEvent {
72    SignalingConnecting,
73    SignalingConnected,
74    SignalingDisconnected,
75    WebSocketConnecting(PeerEvent),
76    WebSocketConnected(PeerEvent),
77    WebSocketDisconnected(PeerEvent),
78    WebRtcConnecting(PeerEvent),
79    WebRtcConnected(PeerEvent),
80    WebRtcDisconnected(PeerEvent),
81    CredentialRenewed(CredentialEvent),
82    CredentialExpiring(CredentialEvent),
83    MailboxBackpressure(BackpressureEvent),
84}
85
86impl PackageHookEvent {
87    pub(crate) fn request_id(&self) -> &'static str {
88        match self {
89            PackageHookEvent::SignalingConnecting => "hook:on_signaling_connecting",
90            PackageHookEvent::SignalingConnected => "hook:on_signaling_connected",
91            PackageHookEvent::SignalingDisconnected => "hook:on_signaling_disconnected",
92            PackageHookEvent::WebSocketConnecting(_) => "hook:on_websocket_connecting",
93            PackageHookEvent::WebSocketConnected(_) => "hook:on_websocket_connected",
94            PackageHookEvent::WebSocketDisconnected(_) => "hook:on_websocket_disconnected",
95            PackageHookEvent::WebRtcConnecting(_) => "hook:on_webrtc_connecting",
96            PackageHookEvent::WebRtcConnected(_) => "hook:on_webrtc_connected",
97            PackageHookEvent::WebRtcDisconnected(_) => "hook:on_webrtc_disconnected",
98            PackageHookEvent::CredentialRenewed(_) => "hook:on_credential_renewed",
99            PackageHookEvent::CredentialExpiring(_) => "hook:on_credential_expiring",
100            PackageHookEvent::MailboxBackpressure(_) => "hook:on_mailbox_backpressure",
101        }
102    }
103}
104
105/// Object-safe handle to a workload linked directly into the host process
106/// (e.g. an embedded Swift / Kotlin app, or a Rust process that owns the
107/// actor's business code as a struct rather than a packaged binary).
108///
109/// Plugged into a [`crate::Node`] via
110/// `Node::link_handle` (crate-internal object-safe path) or
111/// [`crate::Node::link`] (generic convenience that
112/// wraps any [`FrameworkWorkload`] implementation in a
113/// [`WorkloadAdapter`]).
114///
115/// A linked handle carries two responsibilities:
116///
117/// 1. **Observation hooks** — every method from
118///    [`actr_framework::Workload`]'s hook surface has an object-safe
119///    counterpart here. The runtime bridges these via
120///    [`LinkedHandleObserver`] into the internal
121///    [`crate::lifecycle::hooks::WorkloadHookObserver`] plumbing.
122/// 2. **Inbound RPC dispatch** — the [`LinkedWorkloadHandle::dispatch`]
123///    method is invoked by the node's `handle_incoming` path when the
124///    node has been linked through the host path. Package-backed
125///    attaches (`attach`) continue to dispatch through the WASM / dynclib
126///    guest ABI.
127#[async_trait]
128#[allow(dead_code)]
129pub(crate) trait LinkedWorkloadHandle: Send + Sync + 'static {
130    // Lifecycle (fallible). The node decides at each lifecycle phase whether
131    // an error aborts startup or is logged as best-effort observation.
132    async fn on_start(&self, _ctx: &RuntimeContext) -> ActorResult<()> {
133        Ok(())
134    }
135    async fn on_ready(&self, _ctx: &RuntimeContext) -> ActorResult<()> {
136        Ok(())
137    }
138    async fn on_stop(&self, _ctx: &RuntimeContext) -> ActorResult<()> {
139        Ok(())
140    }
141    async fn on_error(&self, _ctx: &RuntimeContext, _event: &ErrorEvent) -> ActorResult<()> {
142        Ok(())
143    }
144
145    // Signaling
146    async fn on_signaling_connecting(&self, _ctx: Option<&RuntimeContext>) {}
147    async fn on_signaling_connected(&self, _ctx: Option<&RuntimeContext>) {}
148    async fn on_signaling_disconnected(&self, _ctx: &RuntimeContext) {}
149
150    // WebSocket
151    async fn on_websocket_connecting(&self, _ctx: &RuntimeContext, _event: &PeerEvent) {}
152    async fn on_websocket_connected(&self, _ctx: &RuntimeContext, _event: &PeerEvent) {}
153    async fn on_websocket_disconnected(&self, _ctx: &RuntimeContext, _event: &PeerEvent) {}
154
155    // WebRTC P2P
156    async fn on_webrtc_connecting(&self, _ctx: &RuntimeContext, _event: &PeerEvent) {}
157    async fn on_webrtc_connected(&self, _ctx: &RuntimeContext, _event: &PeerEvent) {}
158    async fn on_webrtc_disconnected(&self, _ctx: &RuntimeContext, _event: &PeerEvent) {}
159
160    // Credential
161    async fn on_credential_renewed(&self, _ctx: &RuntimeContext, _event: &CredentialEvent) {}
162    async fn on_credential_expiring(&self, _ctx: &RuntimeContext, _event: &CredentialEvent) {}
163
164    // Mailbox
165    async fn on_mailbox_backpressure(&self, _ctx: &RuntimeContext, _event: &BackpressureEvent) {}
166
167    /// Dispatch one inbound RPC envelope into the linked workload.
168    ///
169    /// The default implementation rejects the dispatch with
170    /// `ActrError::NotImplemented` so that handles concerned only with
171    /// observation hooks (e.g. adapter-only hosts) can be plugged in
172    /// without supplying a dispatcher. Generic linked attaches go
173    /// through [`WorkloadAdapter`], which overrides this method to call
174    /// into the framework's `MessageDispatcher`.
175    async fn dispatch(
176        &self,
177        _envelope: RpcEnvelope,
178        _ctx: Arc<RuntimeContext>,
179    ) -> ActorResult<Bytes> {
180        Err(ActrError::NotImplemented(
181            "linked workload handle has no dispatcher bound".to_string(),
182        ))
183    }
184}
185
186/// Generic bridge from a user-defined [`actr_framework::Workload`] (with
187/// its associated [`MessageDispatcher`]) to the object-safe
188/// [`LinkedWorkloadHandle`] stored on the node.
189///
190/// `WorkloadAdapter<W>` monomorphises the generic `<C: Context>` methods
191/// from the framework trait to the concrete [`RuntimeContext`] type the
192/// node carries, and forwards inbound RPC envelopes through
193/// `<W::Dispatcher as MessageDispatcher>::dispatch`.
194///
195/// Callers rarely construct this directly; prefer
196/// [`crate::Node::link`] which wraps the workload automatically.
197pub(crate) struct WorkloadAdapter<W: FrameworkWorkload> {
198    inner: Arc<W>,
199}
200
201impl<W: FrameworkWorkload> WorkloadAdapter<W> {
202    /// Wrap the workload in an adapter. Equivalent to
203    /// `Arc::new(WorkloadAdapter { inner: Arc::new(workload) })` but keeps
204    /// the field private so future refactors can change its shape.
205    pub fn new(workload: W) -> Arc<Self> {
206        Arc::new(Self {
207            inner: Arc::new(workload),
208        })
209    }
210
211    /// Test-friendly dispatch entry point: forwards to the workload's
212    /// [`MessageDispatcher`] using any `Context` implementation.
213    ///
214    /// The production path goes through
215    /// [`LinkedWorkloadHandle::dispatch`] with a concrete
216    /// [`RuntimeContext`], but that requires the full node plumbing.
217    /// Tests and alternate hosts can call this method with a lightweight
218    /// `Context` (e.g. `actr_framework::test_support::DummyContext`)
219    /// without standing up a running node.
220    pub async fn dispatch_with_ctx<C: actr_framework::Context>(
221        &self,
222        envelope: RpcEnvelope,
223        ctx: &C,
224    ) -> ActorResult<Bytes> {
225        <W::Dispatcher as MessageDispatcher>::dispatch(self.inner.as_ref(), envelope, ctx).await
226    }
227}
228
229#[async_trait]
230impl<W: FrameworkWorkload> LinkedWorkloadHandle for WorkloadAdapter<W> {
231    // ── Lifecycle ────────────────────────────────────────────────────────
232    async fn on_start(&self, ctx: &RuntimeContext) -> ActorResult<()> {
233        self.inner.on_start(ctx).await
234    }
235    async fn on_ready(&self, ctx: &RuntimeContext) -> ActorResult<()> {
236        self.inner.on_ready(ctx).await
237    }
238    async fn on_stop(&self, ctx: &RuntimeContext) -> ActorResult<()> {
239        self.inner.on_stop(ctx).await
240    }
241    async fn on_error(&self, ctx: &RuntimeContext, event: &ErrorEvent) -> ActorResult<()> {
242        self.inner.on_error(ctx, event).await
243    }
244
245    // ── Signaling ────────────────────────────────────────────────────────
246    async fn on_signaling_connecting(&self, ctx: Option<&RuntimeContext>) {
247        self.inner.on_signaling_connecting(ctx).await
248    }
249    async fn on_signaling_connected(&self, ctx: Option<&RuntimeContext>) {
250        self.inner.on_signaling_connected(ctx).await
251    }
252    async fn on_signaling_disconnected(&self, ctx: &RuntimeContext) {
253        self.inner.on_signaling_disconnected(ctx).await
254    }
255
256    // ── WebSocket ────────────────────────────────────────────────────────
257    async fn on_websocket_connecting(&self, ctx: &RuntimeContext, event: &PeerEvent) {
258        self.inner.on_websocket_connecting(ctx, event).await
259    }
260    async fn on_websocket_connected(&self, ctx: &RuntimeContext, event: &PeerEvent) {
261        self.inner.on_websocket_connected(ctx, event).await
262    }
263    async fn on_websocket_disconnected(&self, ctx: &RuntimeContext, event: &PeerEvent) {
264        self.inner.on_websocket_disconnected(ctx, event).await
265    }
266
267    // ── WebRTC P2P ───────────────────────────────────────────────────────
268    async fn on_webrtc_connecting(&self, ctx: &RuntimeContext, event: &PeerEvent) {
269        self.inner.on_webrtc_connecting(ctx, event).await
270    }
271    async fn on_webrtc_connected(&self, ctx: &RuntimeContext, event: &PeerEvent) {
272        self.inner.on_webrtc_connected(ctx, event).await
273    }
274    async fn on_webrtc_disconnected(&self, ctx: &RuntimeContext, event: &PeerEvent) {
275        self.inner.on_webrtc_disconnected(ctx, event).await
276    }
277
278    // ── Credential ───────────────────────────────────────────────────────
279    async fn on_credential_renewed(&self, ctx: &RuntimeContext, event: &CredentialEvent) {
280        self.inner.on_credential_renewed(ctx, event).await
281    }
282    async fn on_credential_expiring(&self, ctx: &RuntimeContext, event: &CredentialEvent) {
283        self.inner.on_credential_expiring(ctx, event).await
284    }
285
286    // ── Mailbox ──────────────────────────────────────────────────────────
287    async fn on_mailbox_backpressure(&self, ctx: &RuntimeContext, event: &BackpressureEvent) {
288        self.inner.on_mailbox_backpressure(ctx, event).await
289    }
290
291    // ── Dispatch ─────────────────────────────────────────────────────────
292    async fn dispatch(
293        &self,
294        envelope: RpcEnvelope,
295        ctx: Arc<RuntimeContext>,
296    ) -> ActorResult<Bytes> {
297        self.dispatch_with_ctx(envelope, ctx.as_ref()).await
298    }
299}
300
301/// Bridge adapter: forwards every [`LinkedWorkloadHandle`] method to the
302/// `pub(crate)` [`crate::lifecycle::hooks::WorkloadHookObserver`] expected by
303/// the hook dispatcher. Lets the public linked-handle trait live without
304/// exposing the internal hook plumbing.
305pub(crate) struct LinkedHandleObserver {
306    pub(crate) handle: Arc<dyn LinkedWorkloadHandle>,
307}
308
309#[async_trait]
310impl crate::lifecycle::hooks::WorkloadHookObserver for LinkedHandleObserver {
311    async fn on_start(&self, ctx: &RuntimeContext) -> ActorResult<()> {
312        self.handle.on_start(ctx).await
313    }
314    async fn on_ready(&self, ctx: &RuntimeContext) -> ActorResult<()> {
315        self.handle.on_ready(ctx).await
316    }
317    async fn on_stop(&self, ctx: &RuntimeContext) -> ActorResult<()> {
318        self.handle.on_stop(ctx).await
319    }
320    async fn on_error(&self, ctx: &RuntimeContext, event: &ErrorEvent) -> ActorResult<()> {
321        self.handle.on_error(ctx, event).await
322    }
323    async fn on_signaling_connecting(&self, ctx: Option<&RuntimeContext>) {
324        self.handle.on_signaling_connecting(ctx).await
325    }
326    async fn on_signaling_connected(&self, ctx: Option<&RuntimeContext>) {
327        self.handle.on_signaling_connected(ctx).await
328    }
329    async fn on_signaling_disconnected(&self, ctx: &RuntimeContext) {
330        self.handle.on_signaling_disconnected(ctx).await
331    }
332    async fn on_websocket_connecting(&self, ctx: &RuntimeContext, event: &PeerEvent) {
333        self.handle.on_websocket_connecting(ctx, event).await
334    }
335    async fn on_websocket_connected(&self, ctx: &RuntimeContext, event: &PeerEvent) {
336        self.handle.on_websocket_connected(ctx, event).await
337    }
338    async fn on_websocket_disconnected(&self, ctx: &RuntimeContext, event: &PeerEvent) {
339        self.handle.on_websocket_disconnected(ctx, event).await
340    }
341    async fn on_webrtc_connecting(&self, ctx: &RuntimeContext, event: &PeerEvent) {
342        self.handle.on_webrtc_connecting(ctx, event).await
343    }
344    async fn on_webrtc_connected(&self, ctx: &RuntimeContext, event: &PeerEvent) {
345        self.handle.on_webrtc_connected(ctx, event).await
346    }
347    async fn on_webrtc_disconnected(&self, ctx: &RuntimeContext, event: &PeerEvent) {
348        self.handle.on_webrtc_disconnected(ctx, event).await
349    }
350    async fn on_credential_renewed(&self, ctx: &RuntimeContext, event: &CredentialEvent) {
351        self.handle.on_credential_renewed(ctx, event).await
352    }
353    async fn on_credential_expiring(&self, ctx: &RuntimeContext, event: &CredentialEvent) {
354        self.handle.on_credential_expiring(ctx, event).await
355    }
356    async fn on_mailbox_backpressure(&self, ctx: &RuntimeContext, event: &BackpressureEvent) {
357        self.handle.on_mailbox_backpressure(ctx, event).await
358    }
359}
360
361/// Bridge adapter for package-backed workloads.
362///
363/// Attach installs this observer so the regular hook callback path can
364/// forward observation events into Wasm / DynClib guests through the same
365/// workload ABI used by lifecycle and dispatch entrypoints.
366pub(crate) struct PackageHookObserver {
367    pub(crate) workload_dispatch: Arc<tokio::sync::Mutex<Workload>>,
368}
369
370impl PackageHookObserver {
371    async fn dispatch_hook(
372        &self,
373        label: &'static str,
374        ctx: &RuntimeContext,
375        event: PackageHookEvent,
376    ) {
377        use actr_framework::Context as _;
378
379        let invocation = InvocationContext {
380            self_id: ctx.self_id().clone(),
381            caller_id: None,
382            request_id: event.request_id().to_string(),
383        };
384        let call_executor =
385            crate::lifecycle::node::lifecycle_host_abi(ctx.clone(), self.workload_dispatch.clone());
386        let mut workload = self.workload_dispatch.lock().await;
387        if let Err(e) = workload
388            .dispatch_hook_event(event, invocation, &call_executor)
389            .await
390        {
391            tracing::warn!(hook = label, error = %e, "workload package hook returned Err");
392        }
393    }
394}
395
396#[async_trait]
397impl crate::lifecycle::hooks::WorkloadHookObserver for PackageHookObserver {
398    async fn on_signaling_connecting(&self, ctx: Option<&RuntimeContext>) {
399        if let Some(ctx) = ctx {
400            self.dispatch_hook(
401                "on_signaling_connecting",
402                ctx,
403                PackageHookEvent::SignalingConnecting,
404            )
405            .await;
406        }
407    }
408
409    async fn on_signaling_connected(&self, ctx: Option<&RuntimeContext>) {
410        if let Some(ctx) = ctx {
411            self.dispatch_hook(
412                "on_signaling_connected",
413                ctx,
414                PackageHookEvent::SignalingConnected,
415            )
416            .await;
417        }
418    }
419
420    async fn on_signaling_disconnected(&self, ctx: &RuntimeContext) {
421        self.dispatch_hook(
422            "on_signaling_disconnected",
423            ctx,
424            PackageHookEvent::SignalingDisconnected,
425        )
426        .await;
427    }
428
429    async fn on_websocket_connecting(&self, ctx: &RuntimeContext, event: &PeerEvent) {
430        self.dispatch_hook(
431            "on_websocket_connecting",
432            ctx,
433            PackageHookEvent::WebSocketConnecting(event.clone()),
434        )
435        .await;
436    }
437
438    async fn on_websocket_connected(&self, ctx: &RuntimeContext, event: &PeerEvent) {
439        self.dispatch_hook(
440            "on_websocket_connected",
441            ctx,
442            PackageHookEvent::WebSocketConnected(event.clone()),
443        )
444        .await;
445    }
446
447    async fn on_websocket_disconnected(&self, ctx: &RuntimeContext, event: &PeerEvent) {
448        self.dispatch_hook(
449            "on_websocket_disconnected",
450            ctx,
451            PackageHookEvent::WebSocketDisconnected(event.clone()),
452        )
453        .await;
454    }
455
456    async fn on_webrtc_connecting(&self, ctx: &RuntimeContext, event: &PeerEvent) {
457        self.dispatch_hook(
458            "on_webrtc_connecting",
459            ctx,
460            PackageHookEvent::WebRtcConnecting(event.clone()),
461        )
462        .await;
463    }
464
465    async fn on_webrtc_connected(&self, ctx: &RuntimeContext, event: &PeerEvent) {
466        self.dispatch_hook(
467            "on_webrtc_connected",
468            ctx,
469            PackageHookEvent::WebRtcConnected(event.clone()),
470        )
471        .await;
472    }
473
474    async fn on_webrtc_disconnected(&self, ctx: &RuntimeContext, event: &PeerEvent) {
475        self.dispatch_hook(
476            "on_webrtc_disconnected",
477            ctx,
478            PackageHookEvent::WebRtcDisconnected(event.clone()),
479        )
480        .await;
481    }
482
483    async fn on_credential_renewed(&self, ctx: &RuntimeContext, event: &CredentialEvent) {
484        self.dispatch_hook(
485            "on_credential_renewed",
486            ctx,
487            PackageHookEvent::CredentialRenewed(event.clone()),
488        )
489        .await;
490    }
491
492    async fn on_credential_expiring(&self, ctx: &RuntimeContext, event: &CredentialEvent) {
493        self.dispatch_hook(
494            "on_credential_expiring",
495            ctx,
496            PackageHookEvent::CredentialExpiring(event.clone()),
497        )
498        .await;
499    }
500
501    async fn on_mailbox_backpressure(&self, ctx: &RuntimeContext, event: &BackpressureEvent) {
502        self.dispatch_hook(
503            "on_mailbox_backpressure",
504            ctx,
505            PackageHookEvent::MailboxBackpressure(*event),
506        )
507        .await;
508    }
509}
510
511/// Runtime workload enum.
512///
513/// Covers four attach flavours:
514///
515/// - `Wasm` / `DynClib` — a verified `.actr` package bound through
516///   [`crate::Node::attach`]. The package carries a guest binary that the
517///   host dispatches RPC envelopes into.
518/// - `Linked` — an in-process workload handle bound through
519///   [`crate::Node::link`]. Inbound RPC envelopes are forwarded to the
520///   handle via [`LinkedWorkloadHandle::dispatch`].
521#[allow(clippy::large_enum_variant)]
522pub(crate) enum Workload {
523    /// Linked in-process workload handle — hosts dispatch and lifecycle
524    /// hooks inside the current process without a packaged guest binary.
525    Linked(Arc<dyn LinkedWorkloadHandle>),
526    #[cfg(feature = "wasm-engine")]
527    Wasm(crate::wasm::WasmWorkload),
528    #[cfg(feature = "dynclib-engine")]
529    DynClib(crate::dynclib::DynClibWorkload),
530}
531
532impl std::fmt::Debug for Workload {
533    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
534        match self {
535            Workload::Linked(_) => f.write_str("Workload::Linked(<dyn LinkedWorkloadHandle>)"),
536            #[cfg(feature = "wasm-engine")]
537            Workload::Wasm(w) => f.debug_tuple("Workload::Wasm").field(w).finish(),
538            #[cfg(feature = "dynclib-engine")]
539            Workload::DynClib(w) => f.debug_tuple("Workload::DynClib").field(w).finish(),
540        }
541    }
542}
543
544impl Workload {
545    /// Invoke the workload's `on_start` lifecycle hook.
546    pub(crate) fn on_start<'a>(
547        &'a mut self,
548        ctx: RuntimeContext,
549        invocation: InvocationContext,
550        host_abi: &'a HostAbiFn,
551    ) -> Pin<Box<dyn Future<Output = ActorResult<()>> + Send + 'a>> {
552        Box::pin(async move {
553            let _ = (&invocation, host_abi);
554            match self {
555                Workload::Linked(handle) => handle.on_start(&ctx).await,
556                #[cfg(feature = "wasm-engine")]
557                Workload::Wasm(workload) => workload
558                    .call_on_start(invocation, host_abi)
559                    .await
560                    .map_err(|e| ActrError::Internal(format!("workload on_start failed: {e}"))),
561                #[cfg(feature = "dynclib-engine")]
562                Workload::DynClib(workload) => workload
563                    .call_on_start(invocation, host_abi)
564                    .await
565                    .map_err(|e| ActrError::Internal(format!("workload on_start failed: {e}"))),
566            }
567        })
568    }
569
570    /// Invoke the workload's `on_ready` lifecycle hook.
571    pub(crate) fn on_ready<'a>(
572        &'a mut self,
573        ctx: RuntimeContext,
574        invocation: InvocationContext,
575        host_abi: &'a HostAbiFn,
576    ) -> Pin<Box<dyn Future<Output = ActorResult<()>> + Send + 'a>> {
577        Box::pin(async move {
578            let _ = (&invocation, host_abi);
579            match self {
580                Workload::Linked(handle) => handle.on_ready(&ctx).await,
581                #[cfg(feature = "wasm-engine")]
582                Workload::Wasm(workload) => workload
583                    .call_on_ready(invocation, host_abi)
584                    .await
585                    .map_err(|e| ActrError::Internal(format!("workload on_ready failed: {e}"))),
586                #[cfg(feature = "dynclib-engine")]
587                Workload::DynClib(workload) => workload
588                    .call_on_ready(invocation, host_abi)
589                    .await
590                    .map_err(|e| ActrError::Internal(format!("workload on_ready failed: {e}"))),
591            }
592        })
593    }
594
595    /// Invoke the workload's `on_stop` lifecycle hook.
596    pub(crate) fn on_stop<'a>(
597        &'a mut self,
598        ctx: RuntimeContext,
599        invocation: InvocationContext,
600        host_abi: &'a HostAbiFn,
601    ) -> Pin<Box<dyn Future<Output = ActorResult<()>> + Send + 'a>> {
602        Box::pin(async move {
603            let _ = (&invocation, host_abi);
604            match self {
605                Workload::Linked(handle) => handle.on_stop(&ctx).await,
606                #[cfg(feature = "wasm-engine")]
607                Workload::Wasm(workload) => workload
608                    .call_on_stop(invocation, host_abi)
609                    .await
610                    .map_err(|e| ActrError::Internal(format!("workload on_stop failed: {e}"))),
611                #[cfg(feature = "dynclib-engine")]
612                Workload::DynClib(workload) => workload
613                    .call_on_stop(invocation, host_abi)
614                    .await
615                    .map_err(|e| ActrError::Internal(format!("workload on_stop failed: {e}"))),
616            }
617        })
618    }
619
620    /// Dispatch one inbound RPC envelope.
621    pub(crate) fn dispatch_envelope<'a>(
622        &'a mut self,
623        envelope: RpcEnvelope,
624        ctx: crate::context::RuntimeContext,
625        invocation: InvocationContext,
626        _host_abi: &'a HostAbiFn,
627    ) -> Pin<Box<dyn Future<Output = ActorResult<Bytes>> + Send + 'a>> {
628        Box::pin(async move {
629            let _ = &invocation;
630            match self {
631                Workload::Linked(handle) => handle.dispatch(envelope, Arc::new(ctx)).await,
632                #[cfg(feature = "wasm-engine")]
633                Workload::Wasm(workload) => {
634                    let request_bytes = envelope.encode_to_vec();
635                    workload
636                        .handle(&request_bytes, invocation, _host_abi)
637                        .await
638                        .map(Bytes::from)
639                        .map_err(|e| ActrError::Internal(format!("workload dispatch failed: {e}")))
640                }
641                #[cfg(feature = "dynclib-engine")]
642                Workload::DynClib(workload) => {
643                    let request_bytes = envelope.encode_to_vec();
644                    workload
645                        .handle(&request_bytes, invocation, _host_abi)
646                        .await
647                        .map(Bytes::from)
648                        .map_err(|e| ActrError::Internal(format!("workload dispatch failed: {e}")))
649                }
650            }
651        })
652    }
653
654    pub(crate) fn dispatch_data_stream<'a>(
655        &'a mut self,
656        chunk: DataStream,
657        sender: ActrId,
658        invocation: InvocationContext,
659        host_abi: &'a HostAbiFn,
660    ) -> Pin<Box<dyn Future<Output = ActorResult<()>> + Send + 'a>> {
661        Box::pin(async move {
662            let _ = (&chunk, &sender, host_abi);
663            let _ = &invocation;
664            match self {
665                Workload::Linked(_) => {
666                    let _ = (&chunk, &sender, host_abi);
667                    Err(ActrError::NotImplemented(
668                        "linked workload stream callbacks are registered directly on RuntimeContext"
669                            .to_string(),
670                    ))
671                }
672                #[cfg(feature = "wasm-engine")]
673                Workload::Wasm(workload) => workload
674                    .handle_data_stream(chunk, sender, invocation, host_abi)
675                    .await
676                    .map_err(|e| {
677                        ActrError::Internal(format!("workload stream dispatch failed: {e}"))
678                    }),
679                #[cfg(feature = "dynclib-engine")]
680                Workload::DynClib(workload) => workload
681                    .handle_data_stream(chunk, sender, host_abi)
682                    .await
683                    .map_err(|e| {
684                        ActrError::Internal(format!("workload stream dispatch failed: {e}"))
685                    }),
686            }
687        })
688    }
689
690    /// Dispatch an observation hook into a package-backed workload.
691    ///
692    /// Linked workloads are intentionally a no-op here: they receive the same
693    /// events through `hook_observer`, and calling them here would duplicate
694    /// every hook invocation on the linked path.
695    pub(crate) fn dispatch_hook_event<'a>(
696        &'a mut self,
697        event: PackageHookEvent,
698        invocation: InvocationContext,
699        host_abi: &'a HostAbiFn,
700    ) -> Pin<Box<dyn Future<Output = ActorResult<()>> + Send + 'a>> {
701        Box::pin(async move {
702            let _ = (&event, &invocation, host_abi);
703            match self {
704                Workload::Linked(_) => Ok(()),
705                #[cfg(feature = "wasm-engine")]
706                Workload::Wasm(workload) => workload
707                    .call_hook_event(event, invocation, host_abi)
708                    .await
709                    .map_err(|e| ActrError::Internal(format!("workload hook failed: {e}"))),
710                #[cfg(feature = "dynclib-engine")]
711                Workload::DynClib(workload) => workload
712                    .call_hook_event(event, invocation, host_abi)
713                    .await
714                    .map_err(|e| ActrError::Internal(format!("workload hook failed: {e}"))),
715            }
716        })
717    }
718}
719
720// ─────────────────────────────────────────────────────────────────────────────
721// Shared host-side helpers
722// ─────────────────────────────────────────────────────────────────────────────
723
724/// Decode an [`guest_abi::AbiFrame`] into a strongly-typed [`HostOperation`].
725///
726/// Shared by both WASM and DynClib host backends.
727#[cfg(feature = "dynclib-engine")]
728pub(crate) fn decode_host_operation(frame: guest_abi::AbiFrame) -> Result<HostOperation, i32> {
729    if frame.abi_version != guest_abi::version::V1 {
730        return Err(guest_abi::code::PROTOCOL_ERROR);
731    }
732
733    match frame.op {
734        guest_abi::op::HOST_CALL => {
735            let payload = <HostCallV1 as AbiPayload>::decode_payload(&frame.payload)?;
736            Ok(HostOperation::Call(payload))
737        }
738        guest_abi::op::HOST_TELL => {
739            let payload = <HostTellV1 as AbiPayload>::decode_payload(&frame.payload)?;
740            Ok(HostOperation::Tell(payload))
741        }
742        guest_abi::op::HOST_CALL_RAW => {
743            let payload = <HostCallRawV1 as AbiPayload>::decode_payload(&frame.payload)?;
744            Ok(HostOperation::CallRaw(payload))
745        }
746        guest_abi::op::HOST_DISCOVER => {
747            let payload = <HostDiscoverV1 as AbiPayload>::decode_payload(&frame.payload)?;
748            Ok(HostOperation::Discover(payload))
749        }
750        guest_abi::op::HOST_REGISTER_STREAM => {
751            let payload = <HostRegisterStreamV1 as AbiPayload>::decode_payload(&frame.payload)?;
752            Ok(HostOperation::RegisterStream(payload))
753        }
754        guest_abi::op::HOST_UNREGISTER_STREAM => {
755            let payload = <HostUnregisterStreamV1 as AbiPayload>::decode_payload(&frame.payload)?;
756            Ok(HostOperation::UnregisterStream(payload))
757        }
758        guest_abi::op::HOST_SEND_DATA_STREAM => {
759            let payload = <HostSendDataStreamV1 as AbiPayload>::decode_payload(&frame.payload)?;
760            Ok(HostOperation::SendDataStream(payload))
761        }
762        _ => Err(guest_abi::code::UNSUPPORTED_OP),
763    }
764}
765
766/// Encode an inbound guest dispatch as `GuestHandleV1` wrapped in `AbiFrame`.
767#[cfg(feature = "dynclib-engine")]
768pub(crate) fn encode_guest_handle_request(
769    request_bytes: &[u8],
770    ctx: InvocationContext,
771) -> Result<Vec<u8>, i32> {
772    let request = GuestHandleV1 {
773        ctx,
774        rpc_envelope: request_bytes.to_vec(),
775    };
776    let frame = request.to_frame()?;
777    guest_abi::encode_message(&frame)
778}
779
780#[cfg(feature = "dynclib-engine")]
781pub(crate) fn encode_guest_data_stream_request(
782    chunk: DataStream,
783    sender: ActrId,
784) -> Result<Vec<u8>, i32> {
785    let request = guest_abi::GuestDataStreamV1 { chunk, sender };
786    let frame = request.to_frame()?;
787    guest_abi::encode_message(&frame)
788}
789
790/// Encode a host-to-guest lifecycle request as `GuestLifecycleV1` wrapped in `AbiFrame`.
791#[cfg(feature = "dynclib-engine")]
792pub(crate) fn encode_guest_lifecycle_request(
793    hook: u32,
794    ctx: InvocationContext,
795) -> Result<Vec<u8>, i32> {
796    let request = guest_abi::GuestLifecycleV1 { ctx, hook };
797    let frame = request.to_frame()?;
798    guest_abi::encode_message(&frame)
799}
800
801#[cfg(feature = "dynclib-engine")]
802fn timestamp_to_v1(time: std::time::SystemTime) -> guest_abi::TimestampV1 {
803    let duration = time
804        .duration_since(std::time::UNIX_EPOCH)
805        .unwrap_or_default();
806    guest_abi::TimestampV1 {
807        seconds: duration.as_secs(),
808        nanoseconds: duration.subsec_nanos(),
809    }
810}
811
812#[cfg(feature = "dynclib-engine")]
813fn peer_event_to_v1(event: PeerEvent) -> guest_abi::PeerEventV1 {
814    guest_abi::PeerEventV1 {
815        peer: event.peer,
816        relayed: event.relayed,
817    }
818}
819
820#[cfg(feature = "dynclib-engine")]
821fn credential_event_to_v1(event: CredentialEvent) -> guest_abi::CredentialEventV1 {
822    guest_abi::CredentialEventV1 {
823        new_expiry: timestamp_to_v1(event.new_expiry),
824    }
825}
826
827#[cfg(feature = "dynclib-engine")]
828fn backpressure_event_to_v1(event: BackpressureEvent) -> guest_abi::BackpressureEventV1 {
829    guest_abi::BackpressureEventV1 {
830        queue_len: event.queue_len as u64,
831        threshold: event.threshold as u64,
832    }
833}
834
835/// Encode a host-to-guest observation hook request as `GuestHookV1`.
836#[cfg(feature = "dynclib-engine")]
837pub(crate) fn encode_guest_hook_request(
838    event: PackageHookEvent,
839    ctx: InvocationContext,
840) -> Result<Vec<u8>, i32> {
841    let mut request = GuestHookV1 {
842        ctx,
843        hook: 0,
844        peer: None,
845        credential: None,
846        backpressure: None,
847    };
848
849    match event {
850        PackageHookEvent::SignalingConnecting => {
851            request.hook = guest_abi::runtime_hook::ON_SIGNALING_CONNECTING;
852        }
853        PackageHookEvent::SignalingConnected => {
854            request.hook = guest_abi::runtime_hook::ON_SIGNALING_CONNECTED;
855        }
856        PackageHookEvent::SignalingDisconnected => {
857            request.hook = guest_abi::runtime_hook::ON_SIGNALING_DISCONNECTED;
858        }
859        PackageHookEvent::WebSocketConnecting(event) => {
860            request.hook = guest_abi::runtime_hook::ON_WEBSOCKET_CONNECTING;
861            request.peer = Some(peer_event_to_v1(event));
862        }
863        PackageHookEvent::WebSocketConnected(event) => {
864            request.hook = guest_abi::runtime_hook::ON_WEBSOCKET_CONNECTED;
865            request.peer = Some(peer_event_to_v1(event));
866        }
867        PackageHookEvent::WebSocketDisconnected(event) => {
868            request.hook = guest_abi::runtime_hook::ON_WEBSOCKET_DISCONNECTED;
869            request.peer = Some(peer_event_to_v1(event));
870        }
871        PackageHookEvent::WebRtcConnecting(event) => {
872            request.hook = guest_abi::runtime_hook::ON_WEBRTC_CONNECTING;
873            request.peer = Some(peer_event_to_v1(event));
874        }
875        PackageHookEvent::WebRtcConnected(event) => {
876            request.hook = guest_abi::runtime_hook::ON_WEBRTC_CONNECTED;
877            request.peer = Some(peer_event_to_v1(event));
878        }
879        PackageHookEvent::WebRtcDisconnected(event) => {
880            request.hook = guest_abi::runtime_hook::ON_WEBRTC_DISCONNECTED;
881            request.peer = Some(peer_event_to_v1(event));
882        }
883        PackageHookEvent::CredentialRenewed(event) => {
884            request.hook = guest_abi::runtime_hook::ON_CREDENTIAL_RENEWED;
885            request.credential = Some(credential_event_to_v1(event));
886        }
887        PackageHookEvent::CredentialExpiring(event) => {
888            request.hook = guest_abi::runtime_hook::ON_CREDENTIAL_EXPIRING;
889            request.credential = Some(credential_event_to_v1(event));
890        }
891        PackageHookEvent::MailboxBackpressure(event) => {
892            request.hook = guest_abi::runtime_hook::ON_MAILBOX_BACKPRESSURE;
893            request.backpressure = Some(backpressure_event_to_v1(event));
894        }
895    }
896
897    let frame = request.to_frame()?;
898    guest_abi::encode_message(&frame)
899}
900
901/// Decode guest-encoded [`DestV1`] back to [`actr_framework::Dest`].
902///
903/// Re-exported from `actr_framework::guest::dynclib_abi` for host-side convenience.
904pub(crate) fn decode_dest(
905    v1: &actr_framework::guest::dynclib_abi::DestV1,
906) -> Option<actr_framework::Dest> {
907    actr_framework::guest::dynclib_abi::dest_v1_to_dest(v1)
908}
909
910#[cfg(test)]
911mod tests {
912    use super::*;
913    use crate::inbound::{DataStreamRegistry, MediaFrameRegistry};
914    use crate::lifecycle::hooks::{
915        HookContextBuilder, WorkloadHookObserverRef, build_hook_callback,
916    };
917    use crate::outbound::{Gate, HostGate};
918    use crate::transport::HostTransport;
919    use crate::wire::webrtc::{
920        HookEvent, ReconnectConfig, SignalingClient, SignalingConfig, WebSocketSignalingClient,
921    };
922    use actr_framework::Context as FrameworkContext;
923    use actr_framework::test_support::DummyContext;
924    use actr_protocol::{AIdCredential, ActrId, ActrType, Realm};
925    use tokio::sync::mpsc;
926
927    fn make_id(serial: u64) -> ActrId {
928        ActrId {
929            realm: Realm { realm_id: 1 },
930            serial_number: serial,
931            r#type: ActrType {
932                manufacturer: "test".to_string(),
933                name: "UnitTestActor".to_string(),
934                version: "0.0.1".to_string(),
935            },
936        }
937    }
938
939    fn test_credential() -> AIdCredential {
940        AIdCredential {
941            key_id: 1,
942            claims: bytes::Bytes::from_static(b"claims"),
943            signature: bytes::Bytes::from(vec![0; 64]),
944        }
945    }
946
947    fn test_runtime_context(serial: u64) -> RuntimeContext {
948        let host_transport = Arc::new(HostTransport::new());
949        let inproc_gate = Gate::Host(Arc::new(HostGate::new(host_transport)));
950        let signaling_client: Arc<dyn SignalingClient> =
951            Arc::new(WebSocketSignalingClient::new(SignalingConfig {
952                server_url: url::Url::parse("ws://127.0.0.1:9").expect("valid test URL"),
953                connection_timeout: 1,
954                heartbeat_interval: 30,
955                reconnect_config: ReconnectConfig::default(),
956                auth_config: None,
957                webrtc_role: None,
958            }));
959
960        RuntimeContext::new(
961            make_id(serial),
962            None,
963            "workload-test".to_string(),
964            inproc_gate,
965            None,
966            Arc::new(DataStreamRegistry::new()),
967            Arc::new(MediaFrameRegistry::new()),
968            signaling_client,
969            test_credential(),
970            None,
971        )
972    }
973
974    // ── Minimal Workload + Dispatcher used for adapter tests ────────────────
975    struct EchoWorkload {
976        suffix: String,
977    }
978
979    #[async_trait]
980    impl FrameworkWorkload for EchoWorkload {
981        type Dispatcher = EchoDispatcher;
982    }
983
984    struct EchoDispatcher;
985
986    #[async_trait]
987    impl MessageDispatcher for EchoDispatcher {
988        type Workload = EchoWorkload;
989
990        async fn dispatch<C: FrameworkContext>(
991            workload: &Self::Workload,
992            envelope: RpcEnvelope,
993            _ctx: &C,
994        ) -> ActorResult<Bytes> {
995            match envelope.route_key.as_str() {
996                "echo" => {
997                    let payload = envelope
998                        .payload
999                        .as_ref()
1000                        .map(|b| String::from_utf8_lossy(b).to_string())
1001                        .unwrap_or_default();
1002                    let reply = format!("{payload}{}", workload.suffix);
1003                    Ok(Bytes::from(reply.into_bytes()))
1004                }
1005                other => Err(ActrError::InvalidArgument(format!(
1006                    "unknown route: {other}"
1007                ))),
1008            }
1009        }
1010    }
1011
1012    struct LifecycleFailingWorkload;
1013
1014    #[async_trait]
1015    impl FrameworkWorkload for LifecycleFailingWorkload {
1016        type Dispatcher = LifecycleFailingDispatcher;
1017
1018        async fn on_start<C: FrameworkContext>(&self, _ctx: &C) -> ActorResult<()> {
1019            Err(ActrError::Internal("on_start failed".to_string()))
1020        }
1021    }
1022
1023    struct LifecycleFailingDispatcher;
1024
1025    #[async_trait]
1026    impl MessageDispatcher for LifecycleFailingDispatcher {
1027        type Workload = LifecycleFailingWorkload;
1028
1029        async fn dispatch<C: FrameworkContext>(
1030            _workload: &Self::Workload,
1031            _envelope: RpcEnvelope,
1032            _ctx: &C,
1033        ) -> ActorResult<Bytes> {
1034            Ok(Bytes::new())
1035        }
1036    }
1037
1038    struct RecordingWorkload {
1039        tx: mpsc::UnboundedSender<String>,
1040    }
1041
1042    #[async_trait]
1043    impl FrameworkWorkload for RecordingWorkload {
1044        type Dispatcher = RecordingDispatcher;
1045
1046        async fn on_ready<C: FrameworkContext>(&self, ctx: &C) -> ActorResult<()> {
1047            let _ = self
1048                .tx
1049                .send(format!("on_ready:self={}", ctx.self_id().serial_number));
1050            Ok(())
1051        }
1052
1053        async fn on_stop<C: FrameworkContext>(&self, ctx: &C) -> ActorResult<()> {
1054            let _ = self
1055                .tx
1056                .send(format!("on_stop:self={}", ctx.self_id().serial_number));
1057            Ok(())
1058        }
1059
1060        async fn on_websocket_connected<C: FrameworkContext>(&self, ctx: &C, event: &PeerEvent) {
1061            let _ = self.tx.send(format!(
1062                "on_websocket_connected:self={}:peer={}:relayed={}",
1063                ctx.self_id().serial_number,
1064                event.peer.serial_number,
1065                match event.relayed {
1066                    Some(true) => "true",
1067                    Some(false) => "false",
1068                    None => "none",
1069                }
1070            ));
1071        }
1072    }
1073
1074    struct RecordingDispatcher;
1075
1076    #[async_trait]
1077    impl MessageDispatcher for RecordingDispatcher {
1078        type Workload = RecordingWorkload;
1079
1080        async fn dispatch<C: FrameworkContext>(
1081            _workload: &Self::Workload,
1082            _envelope: RpcEnvelope,
1083            _ctx: &C,
1084        ) -> ActorResult<Bytes> {
1085            Ok(Bytes::new())
1086        }
1087    }
1088
1089    async fn expect_recorded(rx: &mut mpsc::UnboundedReceiver<String>, expected: &'static str) {
1090        let observed = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
1091            .await
1092            .expect("linked hook was not called")
1093            .expect("recording workload dropped");
1094        assert_eq!(observed, expected);
1095    }
1096
1097    #[tokio::test]
1098    async fn adapter_dispatch_routes_to_workload_dispatcher() {
1099        let adapter = WorkloadAdapter::new(EchoWorkload {
1100            suffix: "-ok".to_string(),
1101        });
1102        let ctx = DummyContext::new(make_id(42));
1103        let envelope = RpcEnvelope {
1104            request_id: "r1".to_string(),
1105            route_key: "echo".to_string(),
1106            payload: Some(Bytes::from_static(b"hello")),
1107            ..Default::default()
1108        };
1109        let resp = adapter
1110            .dispatch_with_ctx(envelope, &ctx)
1111            .await
1112            .expect("dispatch must succeed");
1113        assert_eq!(&resp[..], b"hello-ok");
1114    }
1115
1116    #[tokio::test]
1117    async fn adapter_dispatch_propagates_unknown_route_error() {
1118        let adapter = WorkloadAdapter::new(EchoWorkload {
1119            suffix: "-ok".to_string(),
1120        });
1121        let ctx = DummyContext::new(make_id(1));
1122        let envelope = RpcEnvelope {
1123            request_id: "r2".to_string(),
1124            route_key: "does/not/exist".to_string(),
1125            payload: Some(Bytes::new()),
1126            ..Default::default()
1127        };
1128        let err = adapter
1129            .dispatch_with_ctx(envelope, &ctx)
1130            .await
1131            .expect_err("unknown route must error");
1132        match err {
1133            ActrError::InvalidArgument(msg) => {
1134                assert!(msg.contains("unknown route"), "unexpected message: {msg}")
1135            }
1136            other => panic!("expected InvalidArgument, got {other:?}"),
1137        }
1138    }
1139
1140    #[tokio::test]
1141    async fn adapter_on_start_propagates_workload_error() {
1142        let adapter = WorkloadAdapter::new(LifecycleFailingWorkload);
1143        let ctx = test_runtime_context(7);
1144
1145        let err = adapter
1146            .on_start(&ctx)
1147            .await
1148            .expect_err("adapter must preserve lifecycle errors");
1149
1150        match err {
1151            ActrError::Internal(msg) => {
1152                assert!(msg.contains("on_start failed"), "unexpected message: {msg}");
1153            }
1154            other => panic!("expected Internal, got {other:?}"),
1155        }
1156    }
1157
1158    #[tokio::test]
1159    async fn workload_on_start_propagates_linked_error() {
1160        let handle: Arc<dyn LinkedWorkloadHandle> = WorkloadAdapter::new(LifecycleFailingWorkload);
1161        let mut workload = Workload::Linked(handle);
1162        let ctx = test_runtime_context(8);
1163        let invocation = InvocationContext {
1164            self_id: make_id(8),
1165            caller_id: None,
1166            request_id: "lifecycle:on_start".to_string(),
1167        };
1168        let host_abi: HostAbiFn = Arc::new(|_| Box::pin(async { HostOperationResult::Done }));
1169
1170        let err = workload
1171            .on_start(ctx, invocation, &host_abi)
1172            .await
1173            .expect_err("workload lifecycle must preserve linked errors");
1174
1175        match err {
1176            ActrError::Internal(msg) => {
1177                assert!(msg.contains("on_start failed"), "unexpected message: {msg}");
1178            }
1179            other => panic!("expected Internal, got {other:?}"),
1180        }
1181    }
1182
1183    #[tokio::test]
1184    async fn workload_on_ready_and_on_stop_reach_linked_workload() {
1185        let (tx, mut rx) = mpsc::unbounded_channel();
1186        let handle: Arc<dyn LinkedWorkloadHandle> = WorkloadAdapter::new(RecordingWorkload { tx });
1187        let mut workload = Workload::Linked(handle);
1188        let host_abi: HostAbiFn = Arc::new(|_| Box::pin(async { HostOperationResult::Done }));
1189
1190        workload
1191            .on_ready(
1192                test_runtime_context(9),
1193                InvocationContext {
1194                    self_id: make_id(9),
1195                    caller_id: None,
1196                    request_id: "lifecycle:on_ready".to_string(),
1197                },
1198                &host_abi,
1199            )
1200            .await
1201            .expect("linked on_ready should dispatch");
1202        workload
1203            .on_stop(
1204                test_runtime_context(9),
1205                InvocationContext {
1206                    self_id: make_id(9),
1207                    caller_id: None,
1208                    request_id: "lifecycle:on_stop".to_string(),
1209                },
1210                &host_abi,
1211            )
1212            .await
1213            .expect("linked on_stop should dispatch");
1214
1215        expect_recorded(&mut rx, "on_ready:self=9").await;
1216        expect_recorded(&mut rx, "on_stop:self=9").await;
1217    }
1218
1219    #[tokio::test]
1220    async fn hook_callback_reaches_linked_workload_once() {
1221        let (tx, mut rx) = mpsc::unbounded_channel();
1222        let handle: Arc<dyn LinkedWorkloadHandle> = WorkloadAdapter::new(RecordingWorkload { tx });
1223        let observer: WorkloadHookObserverRef = Arc::new(LinkedHandleObserver { handle });
1224        let ctx = test_runtime_context(10);
1225        let ctx_builder: HookContextBuilder = Arc::new(move || {
1226            let ctx = ctx.clone();
1227            Box::pin(async move { Some(ctx) })
1228        });
1229        let cb = build_hook_callback(Some(observer), ctx_builder);
1230
1231        cb(HookEvent::WebSocketConnected {
1232            peer_id: make_id(42),
1233        })
1234        .await;
1235
1236        expect_recorded(
1237            &mut rx,
1238            "on_websocket_connected:self=10:peer=42:relayed=none",
1239        )
1240        .await;
1241        tokio::task::yield_now().await;
1242        tokio::task::yield_now().await;
1243        assert!(
1244            rx.try_recv().is_err(),
1245            "linked workload should receive exactly one hook event"
1246        );
1247    }
1248
1249    /// The object-safe bound is the whole point of `LinkedWorkloadHandle`;
1250    /// this guard catches anyone accidentally adding a non-object-safe
1251    /// method to the trait in the future.
1252    #[test]
1253    fn linked_workload_handle_is_object_safe() {
1254        fn accepts(_: Arc<dyn LinkedWorkloadHandle>) {}
1255        let adapter: Arc<dyn LinkedWorkloadHandle> = WorkloadAdapter::new(EchoWorkload {
1256            suffix: "-ok".to_string(),
1257        });
1258        accepts(adapter);
1259    }
1260
1261    /// Verify the `Debug` surface stays stable for linked workloads.
1262    #[test]
1263    fn linked_workload_debug_surface() {
1264        let handle: Arc<dyn LinkedWorkloadHandle> = WorkloadAdapter::new(EchoWorkload {
1265            suffix: "-ok".to_string(),
1266        });
1267        let linked = Workload::Linked(handle);
1268        assert!(format!("{:?}", linked).starts_with("Workload::Linked"));
1269    }
1270}