Skip to main content

actr_hyper/lifecycle/
node.rs

1//! Node runtime inner — holds all running-state fields for an attached node.
2//!
3//! This module is the internal implementation backing the public
4//! `Node<Attached>` / `Node<Registered>` typestate chain defined in
5//! `crate::lib`. The struct itself is crate-private; consumers interact with
6//! it indirectly through `Node<S>` → `ActrRef` transitions.
7
8use crate::actr_ref::{ActrRef, ActrRefShared};
9use crate::ais_client::AisClient;
10use crate::context::{BootstrapContextBuilder, RuntimeContext};
11use crate::inbound::{DataStreamRegistry, MediaFrameRegistry};
12use crate::lifecycle::dedup::{DEDUP_TTL, DedupOutcome, DedupState, DedupWaiter};
13use crate::outbound::Gate;
14use crate::transport::HostTransport;
15use crate::wire::webrtc::SignalingClient;
16#[cfg(feature = "opentelemetry")]
17use crate::wire::webrtc::trace::{inject_span_context_to_rpc, set_parent_from_rpc_envelope};
18use actr_framework::Bytes;
19use actr_protocol::prost::Message as ProstMessage;
20use actr_protocol::{
21    AIdCredential, ActorResult, ActrError, ActrId, PayloadType, RegisterAuthMode, RegisterRequest,
22    RpcEnvelope, TurnCredential, register_response,
23};
24use actr_runtime::check_acl_permission;
25use actr_runtime_mailbox::{DeadLetterQueue, Mailbox};
26use std::sync::Arc;
27use std::time::Duration;
28use tokio::sync::{Mutex, RwLock};
29use tokio_util::sync::CancellationToken;
30#[cfg(feature = "opentelemetry")]
31use tracing::Instrument as _;
32
33/// Internal running-state of an attached node.
34///
35/// Holds every field required to run a workload after `Hyper::attach` has
36/// bound a package. Kept private to the crate: external callers use the
37/// public `Node<S>` wrappers in `crate::lib` and the `ActrRef` handle
38/// returned by `Node::start`.
39pub(crate) struct Inner {
40    /// Runtime configuration
41    pub(crate) config: actr_config::RuntimeConfig,
42
43    /// SQLite persistent mailbox
44    pub(crate) mailbox: Arc<dyn Mailbox>,
45
46    /// Dead Letter Queue for poison messages
47    pub(crate) dlq: Arc<dyn DeadLetterQueue>,
48
49    /// In-process gate for `Dest::Shell` / `Dest::Local` calls.
50    ///
51    /// Created in `build()` together with `shell_to_workload` so the inproc
52    /// lane is usable as soon as the node exists, even before registration.
53    pub(crate) inproc_gate: Gate,
54
55    /// Cross-process gate for `Dest::Actor(_)` calls.
56    ///
57    /// `None` until `start()` finishes WebRTC / PeerGate initialization. Any
58    /// outbound call issued before that point returns `Internal("PeerGate
59    /// not initialized yet")` — see `RuntimeContext::select_gate`.
60    pub(crate) outproc_gate: Option<Gate>,
61
62    /// DataStream callback registry shared between the inbound WebRTC / WS
63    /// gates (which dispatch into it) and `RuntimeContext`
64    /// (register_stream / send_data_stream).
65    pub(crate) data_stream_registry: Arc<DataStreamRegistry>,
66
67    /// MediaTrack callback registry shared between WebRTC media tracks and
68    /// `RuntimeContext` (register_media_track / send_media_sample).
69    pub(crate) media_frame_registry: Arc<MediaFrameRegistry>,
70
71    /// Signaling client
72    pub(crate) signaling_client: Arc<dyn SignalingClient>,
73
74    /// Actor ID (obtained after startup)
75    pub(crate) actor_id: Option<ActrId>,
76
77    /// Actor Credential (obtained after startup, used for subsequent authentication messages)
78    pub(crate) credential_state: Option<CredentialState>,
79
80    /// WebRTC coordinator (created after startup)
81    pub(crate) webrtc_coordinator: Option<Arc<crate::wire::webrtc::WebRtcCoordinator>>,
82
83    /// Peer transport manager (created after startup)
84    pub(crate) peer_transport: Option<Arc<crate::transport::PeerTransport>>,
85
86    /// WebRTC Gate (created after startup)
87    pub(crate) webrtc_gate: Option<Arc<crate::wire::webrtc::gate::WebRtcGate>>,
88
89    /// WebSocket Gate (direct-connect mode inbound, optional)
90    pub(crate) websocket_gate: Option<Arc<crate::wire::websocket::WebSocketGate>>,
91
92    /// Shell → Workload transport (REQUEST direction)
93    ///
94    /// Workload receives REQUEST from Shell (zero serialization, direct RpcEnvelope passing)
95    pub(crate) shell_to_workload: Option<Arc<HostTransport>>,
96
97    /// Workload → Shell transport (RESPONSE direction)
98    ///
99    /// Workload sends RESPONSE to Shell (separate pending_requests from Shell's)
100    pub(crate) workload_to_shell: Option<Arc<HostTransport>>,
101
102    /// Shutdown token for graceful shutdown
103    pub(crate) shutdown_token: CancellationToken,
104
105    /// Packaged manifest.lock.toml content loaded at startup for fingerprint lookups.
106    ///
107    /// Wrapped in `Arc` so per-request `RuntimeContext` clones only bump a refcount
108    /// instead of deep-cloning the dependency vector.
109    pub(crate) actr_lock: Option<Arc<actr_config::lock::LockFile>>,
110    /// Network event receiver (from NetworkEventHandle)
111    pub(crate) network_event_rx:
112        Option<tokio::sync::mpsc::Receiver<crate::lifecycle::network_event::NetworkEventRequest>>,
113
114    /// Network event debounce configuration
115    pub(crate) network_event_debounce_config:
116        Option<crate::lifecycle::network_event::DebounceConfig>,
117
118    /// Request deduplication state (15 s TTL response cache, prevents double-processing on retry)
119    pub(crate) dedup_state: Arc<Mutex<DedupState>>,
120
121    /// Verified package manifest for package-backed nodes.
122    #[allow(dead_code)]
123    pub(crate) package_manifest: Option<actr_pack::PackageManifest>,
124
125    /// Pre-issued registration credential injected by the Hyper layer during
126    /// the `Attached → Registered` state transition. `start()` uses it directly
127    /// instead of re-registering with the signaling server.
128    pub(crate) preregistered_credential: Option<actr_protocol::register_response::RegisterOk>,
129
130    /// Shared WebSocket direct-connect address map populated by discovery
131    ///
132    /// Shared with `DefaultWireBuilder` so discovered ws:// URLs can be reused
133    /// directly instead of relying on a static url_template
134    /// The map is keyed by `ActrId`.
135    pub(crate) discovered_ws_addresses:
136        Arc<tokio::sync::RwLock<std::collections::HashMap<ActrId, String>>>,
137
138    /// Runtime workload (WASM, dynclib, etc.)
139    ///
140    /// `handle_incoming` dispatches through this workload.
141    ///
142    /// The `Mutex` serializes dispatch into a single guest actor instance:
143    /// `WasmWorkload::handle` and `DynClibWorkload::handle` both take
144    /// `&mut self` because the underlying Wasmtime `Store` / native guest
145    /// ABI is single-threaded, so concurrent dispatch through the same
146    /// instance would be unsound. Lifecycle hooks also take this lock because
147    /// package-backed WASM / dynclib workloads expose them on the same guest
148    /// instance; transport and other observation hooks reach linked workloads
149    /// through `hook_observer` without holding this lock.
150    pub(crate) workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
151
152    /// Optional shell-side observer that receives linked-workload transport /
153    /// credential / mailbox hook invocations.
154    ///
155    /// `None` means "no observer installed"; the built-in tracing defaults
156    /// still fire from the event-source wiring sites. When `Some`, hook
157    /// invocations are dispatched through `lifecycle::hooks::spawn_hook`
158    /// so panics in observer code cannot unwind into the event source.
159    #[allow(dead_code)]
160    pub(crate) hook_observer: Option<crate::lifecycle::hooks::WorkloadHookObserverRef>,
161
162    /// Queue-length threshold at which the mailbox backpressure
163    /// watchdog fires the framework `on_mailbox_backpressure` hook.
164    ///
165    /// Resolved from [`HyperConfig`] at node construction time so the
166    /// runtime loop does not need to hold a reference back to `HyperConfig`.
167    pub(crate) mailbox_backpressure_threshold: usize,
168
169    /// Lead time before credential expiry at which the framework fires
170    /// the `on_credential_expiring` hook. Resolved from [`HyperConfig`]
171    /// at node construction time.
172    #[allow(dead_code)]
173    pub(crate) credential_expiry_warning: Duration,
174}
175
176/// Credential state for shared access between tasks
177#[derive(Clone)]
178pub struct CredentialState {
179    inner: Arc<RwLock<CredentialStateInner>>,
180}
181
182#[derive(Clone)]
183struct CredentialStateInner {
184    credential: AIdCredential,
185    expires_at: Option<prost_types::Timestamp>,
186    /// HMAC time-limited TURN credential, updated together with credential on registration/renewal
187    turn_credential: Option<TurnCredential>,
188}
189
190impl CredentialState {
191    /// Create a new CredentialState with TURN credential
192    pub fn new(
193        credential: AIdCredential,
194        expires_at: Option<prost_types::Timestamp>,
195        turn_credential: Option<TurnCredential>,
196    ) -> Self {
197        Self {
198            inner: Arc::new(RwLock::new(CredentialStateInner {
199                credential,
200                expires_at,
201                turn_credential,
202            })),
203        }
204    }
205
206    pub async fn credential(&self) -> AIdCredential {
207        self.inner.read().await.credential.clone()
208    }
209
210    pub async fn expires_at(&self) -> Option<prost_types::Timestamp> {
211        self.inner.read().await.expires_at
212    }
213
214    /// Get TURN credential (HMAC time-limited credential)
215    pub async fn turn_credential(&self) -> Option<TurnCredential> {
216        self.inner.read().await.turn_credential.clone()
217    }
218
219    /// Update credential and TURN credential
220    ///
221    /// Called on credential renewal; only overwrites the old TURN credential when the new one is not empty
222    pub(crate) async fn update(
223        &self,
224        credential: AIdCredential,
225        expires_at: Option<prost_types::Timestamp>,
226        turn_credential: Option<TurnCredential>,
227    ) {
228        let mut guard = self.inner.write().await;
229        guard.credential = credential;
230        guard.expires_at = expires_at;
231        if turn_credential.is_some() {
232            guard.turn_credential = turn_credential;
233        }
234    }
235}
236
237/// Host operation executor - routes guest outbound calls through RuntimeContext
238///
239/// Called by the workload dispatch path in `handle_incoming`.
240async fn host_operation_handler(
241    ctx: crate::context::RuntimeContext,
242    workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
243    pending: crate::workload::HostOperation,
244) -> crate::workload::HostOperationResult {
245    use crate::workload::{HostOperation, HostOperationResult, decode_dest};
246    use actr_framework::guest::dynclib_abi::code as abi_code;
247    use actr_framework::{Context as _, Dest};
248    use actr_protocol::{DataStream, PayloadType};
249
250    /// Map `ActrError` to ABI error code, preserving semantics for guest-side discrimination
251    fn actr_error_to_code(err: &ActrError) -> i32 {
252        match err {
253            ActrError::DecodeFailure(_) | ActrError::InvalidArgument(_) => abi_code::PROTOCOL_ERROR,
254            _ => abi_code::GENERIC_ERROR,
255        }
256    }
257
258    match pending {
259        HostOperation::CallRaw(req) => {
260            match ctx
261                .call_raw(
262                    &Dest::Actor(req.target),
263                    req.route_key,
264                    PayloadType::RpcReliable,
265                    bytes::Bytes::from(req.payload),
266                    30_000,
267                )
268                .await
269            {
270                Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
271                Err(e) => {
272                    tracing::error!("call_raw routing failed: {e:?}");
273                    HostOperationResult::Error(actr_error_to_code(&e))
274                }
275            }
276        }
277
278        HostOperation::Call(req) => {
279            let dest = match decode_dest(&req.dest) {
280                Some(d) => d,
281                None => {
282                    tracing::error!(route_key = req.route_key, "call: dest decode failed");
283                    return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
284                }
285            };
286            match ctx
287                .call_raw(
288                    &dest,
289                    req.route_key,
290                    PayloadType::RpcReliable,
291                    bytes::Bytes::from(req.payload),
292                    30_000,
293                )
294                .await
295            {
296                Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
297                Err(e) => {
298                    tracing::error!("call routing failed: {e:?}");
299                    HostOperationResult::Error(actr_error_to_code(&e))
300                }
301            }
302        }
303
304        HostOperation::Tell(req) => {
305            let dest = match decode_dest(&req.dest) {
306                Some(d) => d,
307                None => {
308                    tracing::error!(route_key = req.route_key, "tell: dest decode failed");
309                    return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
310                }
311            };
312            match ctx
313                .tell_raw(
314                    &dest,
315                    req.route_key,
316                    PayloadType::RpcReliable,
317                    bytes::Bytes::from(req.payload),
318                )
319                .await
320            {
321                Ok(()) => HostOperationResult::Done,
322                Err(e) => {
323                    tracing::error!("tell routing failed: {e:?}");
324                    HostOperationResult::Error(actr_error_to_code(&e))
325                }
326            }
327        }
328
329        HostOperation::Discover(req) => {
330            match ctx.discover_route_candidate(&req.target_type).await {
331                Ok(id) => HostOperationResult::Bytes(id.encode_to_vec()),
332                Err(e) => {
333                    tracing::error!("discover failed: {e:?}");
334                    HostOperationResult::Error(actr_error_to_code(&e))
335                }
336            }
337        }
338
339        HostOperation::RegisterStream(req) => {
340            let stream_id = req.stream_id;
341            let callback_ctx = ctx.clone();
342            let callback_workload_dispatch = workload_dispatch.clone();
343            match ctx
344                .register_stream(stream_id, move |chunk: DataStream, sender| {
345                    let ctx_for_executor = callback_ctx.clone();
346                    let workload_dispatch = callback_workload_dispatch.clone();
347                    Box::pin(async move {
348                        let invocation = crate::workload::InvocationContext {
349                            self_id: actr_framework::Context::self_id(&ctx_for_executor).clone(),
350                            caller_id: Some(sender.clone()),
351                            request_id: format!(
352                                "data-stream:{}:{}",
353                                chunk.stream_id, chunk.sequence
354                            ),
355                        };
356                        let call_executor: crate::workload::HostAbiFn =
357                            std::sync::Arc::new(move |pending| {
358                                let ctx = ctx_for_executor.clone();
359                                Box::pin(async move {
360                                    stream_callback_host_operation_handler(ctx, pending).await
361                                })
362                            });
363                        let mut guard = workload_dispatch.lock().await;
364                        guard
365                            .dispatch_data_stream(chunk, sender, invocation, &call_executor)
366                            .await
367                    })
368                })
369                .await
370            {
371                Ok(()) => HostOperationResult::Done,
372                Err(e) => {
373                    tracing::error!("register_stream failed: {e:?}");
374                    HostOperationResult::Error(actr_error_to_code(&e))
375                }
376            }
377        }
378
379        HostOperation::UnregisterStream(req) => match ctx.unregister_stream(&req.stream_id).await {
380            Ok(()) => HostOperationResult::Done,
381            Err(e) => {
382                tracing::error!("unregister_stream failed: {e:?}");
383                HostOperationResult::Error(actr_error_to_code(&e))
384            }
385        },
386
387        HostOperation::SendDataStream(req) => {
388            let dest = match decode_dest(&req.dest) {
389                Some(d) => d,
390                None => {
391                    tracing::error!("send_data_stream: dest decode failed");
392                    return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
393                }
394            };
395            let payload_type = match PayloadType::try_from(req.payload_type) {
396                Ok(PayloadType::StreamReliable | PayloadType::StreamLatencyFirst) => {
397                    PayloadType::try_from(req.payload_type).expect("checked payload type")
398                }
399                Ok(other) => {
400                    tracing::error!(?other, "send_data_stream: invalid stream payload type");
401                    return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
402                }
403                Err(_) => {
404                    tracing::error!(
405                        payload_type = req.payload_type,
406                        "send_data_stream: unknown payload type"
407                    );
408                    return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
409                }
410            };
411            match ctx.send_data_stream(&dest, req.chunk, payload_type).await {
412                Ok(()) => HostOperationResult::Done,
413                Err(e) => {
414                    tracing::error!("send_data_stream failed: {e:?}");
415                    HostOperationResult::Error(actr_error_to_code(&e))
416                }
417            }
418        }
419    }
420}
421
422fn lifecycle_invocation(
423    actor_id: &ActrId,
424    request_id: &'static str,
425) -> crate::workload::InvocationContext {
426    crate::workload::InvocationContext {
427        self_id: actor_id.clone(),
428        caller_id: None,
429        request_id: request_id.to_string(),
430    }
431}
432
433pub(crate) fn lifecycle_host_abi(
434    ctx: crate::context::RuntimeContext,
435    workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
436) -> crate::workload::HostAbiFn {
437    std::sync::Arc::new(move |pending| {
438        let ctx = ctx.clone();
439        let workload_dispatch = workload_dispatch.clone();
440        Box::pin(async move { host_operation_handler(ctx, workload_dispatch, pending).await })
441    })
442}
443
444async fn stream_callback_host_operation_handler(
445    ctx: crate::context::RuntimeContext,
446    pending: crate::workload::HostOperation,
447) -> crate::workload::HostOperationResult {
448    use crate::workload::{HostOperation, HostOperationResult, decode_dest};
449    use actr_framework::guest::dynclib_abi::code as abi_code;
450    use actr_framework::{Context as _, Dest};
451    use actr_protocol::PayloadType;
452
453    fn actr_error_to_code(err: &ActrError) -> i32 {
454        match err {
455            ActrError::DecodeFailure(_) | ActrError::InvalidArgument(_) => abi_code::PROTOCOL_ERROR,
456            _ => abi_code::GENERIC_ERROR,
457        }
458    }
459
460    match pending {
461        HostOperation::CallRaw(req) => {
462            match ctx
463                .call_raw(
464                    &Dest::Actor(req.target),
465                    req.route_key,
466                    PayloadType::RpcReliable,
467                    bytes::Bytes::from(req.payload),
468                    30_000,
469                )
470                .await
471            {
472                Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
473                Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
474            }
475        }
476        HostOperation::Call(req) => {
477            let dest = match decode_dest(&req.dest) {
478                Some(d) => d,
479                None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
480            };
481            match ctx
482                .call_raw(
483                    &dest,
484                    req.route_key,
485                    PayloadType::RpcReliable,
486                    bytes::Bytes::from(req.payload),
487                    30_000,
488                )
489                .await
490            {
491                Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
492                Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
493            }
494        }
495        HostOperation::Tell(req) => {
496            let dest = match decode_dest(&req.dest) {
497                Some(d) => d,
498                None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
499            };
500            match ctx
501                .tell_raw(
502                    &dest,
503                    req.route_key,
504                    PayloadType::RpcReliable,
505                    bytes::Bytes::from(req.payload),
506                )
507                .await
508            {
509                Ok(()) => HostOperationResult::Done,
510                Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
511            }
512        }
513        HostOperation::Discover(req) => {
514            match ctx.discover_route_candidate(&req.target_type).await {
515                Ok(id) => HostOperationResult::Bytes(id.encode_to_vec()),
516                Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
517            }
518        }
519        HostOperation::RegisterStream(_) => {
520            tracing::error!("register_stream from inside a stream callback is not supported");
521            HostOperationResult::Error(abi_code::UNSUPPORTED_OP)
522        }
523        HostOperation::UnregisterStream(req) => match ctx.unregister_stream(&req.stream_id).await {
524            Ok(()) => HostOperationResult::Done,
525            Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
526        },
527        HostOperation::SendDataStream(req) => {
528            let dest = match decode_dest(&req.dest) {
529                Some(d) => d,
530                None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
531            };
532            let payload_type = match PayloadType::try_from(req.payload_type) {
533                Ok(PayloadType::StreamReliable | PayloadType::StreamLatencyFirst) => {
534                    PayloadType::try_from(req.payload_type).expect("checked payload type")
535                }
536                Ok(_) | Err(_) => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
537            };
538            match ctx.send_data_stream(&dest, req.chunk, payload_type).await {
539                Ok(()) => HostOperationResult::Done,
540                Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
541            }
542        }
543    }
544}
545
546/// Map `ActrError` to a stable, unique numeric code for the wire
547/// `ErrorResponse`.  Each variant has a distinct code so
548/// `wire_code_to_actr_error` can reconstruct the exact variant on the
549/// receiving side.
550///
551/// Code allocation (100xx namespace to avoid collisions with HTTP):
552///
553/// | code  | variant            |
554/// |-------|--------------------|
555/// | 10001 | Unavailable        |
556/// | 10002 | TimedOut           |
557/// | 10003 | NotFound           |
558/// | 10004 | PermissionDenied   |
559/// | 10005 | InvalidArgument    |
560/// | 10006 | UnknownRoute       |
561/// | 10007 | DependencyNotFound |
562/// | 10008 | DecodeFailure      |
563/// | 10009 | NotImplemented     |
564/// | 10010 | Internal           |
565pub(crate) fn protocol_error_to_code(err: &ActrError) -> u32 {
566    match err {
567        ActrError::Unavailable(_) => 10001,
568        // ConnectionNotReady is a local send-preflight error and should not
569        // normally cross the wire; if it does, collapse it to Unavailable
570        // (same as the legacy 503 mapping did).
571        ActrError::ConnectionNotReady(_) => 10001,
572        ActrError::TimedOut => 10002,
573        ActrError::NotFound(_) => 10003,
574        ActrError::PermissionDenied(_) => 10004,
575        ActrError::InvalidArgument(_) => 10005,
576        ActrError::UnknownRoute(_) => 10006,
577        ActrError::DependencyNotFound { .. } => 10007,
578        ActrError::DecodeFailure(_) => 10008,
579        ActrError::NotImplemented(_) => 10009,
580        ActrError::Internal(_) => 10010,
581    }
582}
583
584/// Reconstruct an `ActrError` from a wire code + message pair.
585///
586/// This is the inverse of `protocol_error_to_code`.  Unknown codes fall
587/// back to `ActrError::Unavailable` to avoid silent data loss.
588pub(crate) fn wire_code_to_actr_error(code: u32, message: String) -> ActrError {
589    match code {
590        10001 => ActrError::Unavailable(message),
591        10002 => ActrError::TimedOut,
592        10003 => ActrError::NotFound(message),
593        10004 => ActrError::PermissionDenied(message),
594        10005 => ActrError::InvalidArgument(message),
595        10006 => ActrError::UnknownRoute(message),
596        10007 => ActrError::DependencyNotFound {
597            service_name: String::new(),
598            message,
599        },
600        10008 => ActrError::DecodeFailure(message),
601        10009 => ActrError::NotImplemented(message),
602        10010 => ActrError::Internal(message),
603        // Legacy HTTP-ish codes emitted before this scheme was introduced,
604        // or any unknown future code: treat as Unavailable.
605        _ => ActrError::Unavailable(format!("rpc error {code}: {message}")),
606    }
607}
608
609impl Inner {
610    #[allow(dead_code)]
611    pub(crate) fn package_manifest(&self) -> Option<&actr_pack::PackageManifest> {
612        self.package_manifest.as_ref()
613    }
614
615    /// Network event processing loop (background task)
616    ///
617    /// # Responsibilities
618    /// - Receive network events from Channel
619    /// - Delegate to NetworkEventProcessor for handling
620    /// - Record processing time and send results
621    async fn network_event_loop(
622        event_rx: tokio::sync::mpsc::Receiver<crate::lifecycle::network_event::NetworkEventRequest>,
623        event_processor: Arc<dyn crate::lifecycle::network_event::NetworkEventProcessor>,
624        shutdown_token: CancellationToken,
625    ) {
626        crate::lifecycle::network_event::run_network_event_reconciler(
627            event_rx,
628            event_processor,
629            shutdown_token,
630        )
631        .await;
632    }
633
634    fn duplicate_wait_timeout(timeout_ms: i64) -> Duration {
635        if timeout_ms > 0 {
636            Duration::from_millis(timeout_ms as u64)
637        } else {
638            DEDUP_TTL
639        }
640    }
641
642    async fn wait_for_inflight_duplicate(
643        mut waiter: DedupWaiter,
644        timeout: Duration,
645    ) -> ActorResult<Bytes> {
646        let wait_for_result = async {
647            loop {
648                if let Some(result) = waiter.borrow().clone() {
649                    return result;
650                }
651
652                if waiter.changed().await.is_err() {
653                    if let Some(result) = waiter.borrow().clone() {
654                        return result;
655                    }
656                    return Err(ActrError::Unavailable(
657                        "duplicate request result unavailable".to_string(),
658                    ));
659                }
660            }
661        };
662
663        match tokio::time::timeout(timeout, wait_for_result).await {
664            Ok(result) => result,
665            Err(_) => Err(ActrError::Unavailable(format!(
666                "duplicate request in-flight timed out after {}ms",
667                timeout.as_millis()
668            ))),
669        }
670    }
671
672    /// - Single-hop calls: effectively identical
673    /// - Multi-hop calls: trace_id spans all hops, request_id per hop
674    #[cfg_attr(
675        feature = "opentelemetry",
676        tracing::instrument(
677            skip_all,
678            name = "ActrNode.handle_incoming",
679            fields(
680                actr_id = %self.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default(),
681                route_key = %envelope.route_key,
682                request_id = %envelope.request_id,
683            )
684        )
685    )]
686    pub async fn handle_incoming(
687        &self,
688        envelope: RpcEnvelope,
689        caller_id: Option<&ActrId>,
690    ) -> ActorResult<Bytes> {
691        // Log received message
692        if let Some(caller) = caller_id {
693            tracing::debug!(
694                "📨 Handling incoming message: route_key={}, caller={}, request_id={}",
695                envelope.route_key,
696                caller,
697                envelope.request_id
698            );
699        } else {
700            tracing::debug!(
701                "📨 Handling incoming message: route_key={}, request_id={}",
702                envelope.route_key,
703                envelope.request_id
704            );
705        }
706
707        // 0. Get actor_id early for ACL check
708        let actor_id = self.actor_id.as_ref().ok_or_else(|| {
709            ActrError::Internal(
710                "Actor ID not set - node must be started before handling messages".to_string(),
711            )
712        })?;
713
714        // 0.1. ACL Permission Check (before processing message)
715        let acl_allowed = check_acl_permission(caller_id, actor_id, self.config.acl.as_ref())
716            .map_err(|err_msg| ActrError::Internal(format!("ACL check failed: {}", err_msg)))?;
717
718        if !acl_allowed {
719            tracing::warn!(
720                severity = 5,
721                error_category = "acl_denied",
722                request_id = %envelope.request_id,
723                route_key = %envelope.route_key,
724                caller = %caller_id
725                    .map(|c| c.to_string())
726                    .unwrap_or_else(|| "<none>".to_string()),
727                "🚫 ACL: Permission denied"
728            );
729
730            return Err(ActrError::PermissionDenied(format!(
731                "ACL denied: {} is not allowed to call {}",
732                caller_id
733                    .map(|c| c.to_string())
734                    .unwrap_or_else(|| "<unknown>".to_string()),
735                actor_id
736            )));
737        }
738
739        // 0.2. Deduplication: return cached response for retried request_ids
740        let outcome = {
741            self.dedup_state
742                .lock()
743                .await
744                .check_or_mark(&envelope.request_id)
745        };
746        match outcome {
747            DedupOutcome::Fresh => {} // proceed normally
748            DedupOutcome::InFlight(waiter) => {
749                tracing::debug!(
750                    request_id = %envelope.request_id,
751                    route_key = %envelope.route_key,
752                    "duplicate request in-flight; waiting for original result"
753                );
754                return Self::wait_for_inflight_duplicate(
755                    waiter,
756                    Self::duplicate_wait_timeout(envelope.timeout_ms),
757                )
758                .await;
759            }
760            DedupOutcome::Duplicate(cached) => {
761                tracing::debug!(
762                    request_id = %envelope.request_id,
763                    route_key = %envelope.route_key,
764                    "♻️ returning cached response for duplicate request_id"
765                );
766                return cached;
767            }
768        }
769
770        // 1. Create Context with caller_id from transport layer
771        let credential_state = self.credential_state.clone().ok_or_else(|| {
772            ActrError::Internal(
773                "Credential not set - node must be started before handling messages".to_string(),
774            )
775        })?;
776        let ctx = self.make_runtime_context(
777            actor_id,
778            caller_id, // caller_id from transport layer (MessageRecord.from)
779            &envelope.request_id,
780            &credential_state.credential().await,
781        );
782
783        // 2. Dispatch
784        let dispatch_ctx = crate::workload::InvocationContext {
785            self_id: actor_id.clone(),
786            caller_id: caller_id.cloned(),
787            request_id: envelope.request_id.clone(),
788        };
789        let ctx_for_executor = ctx.clone();
790        let workload_for_executor = self.workload_dispatch.clone();
791        let call_executor: crate::workload::HostAbiFn = std::sync::Arc::new(move |pending| {
792            let ctx = ctx_for_executor.clone();
793            let workload_dispatch = workload_for_executor.clone();
794            Box::pin(async move { host_operation_handler(ctx, workload_dispatch, pending).await })
795        });
796
797        let mut guard = self.workload_dispatch.lock().await;
798        let result = guard
799            .dispatch_envelope(envelope.clone(), ctx.clone(), dispatch_ctx, &call_executor)
800            .await;
801
802        match &result {
803            Ok(_) => tracing::debug!(
804                request_id = %envelope.request_id,
805                route_key = %envelope.route_key,
806                "✅ Message handled successfully"
807            ),
808            Err(e) => tracing::error!(
809                severity = 6,
810                error_category = "handler_error",
811                request_id = %envelope.request_id,
812                route_key = %envelope.route_key,
813                "❌ Message handling failed: {:?}", e
814            ),
815        }
816
817        // 3. Store completed result in dedup cache before returning
818        self.dedup_state
819            .lock()
820            .await
821            .complete(&envelope.request_id, result.clone());
822
823        result
824    }
825
826    /// Build a new `Inner` from config and runtime workload.
827    ///
828    /// This is the internal constructor behind the public node builders and
829    /// Hyper package attach helpers.
830    pub(crate) async fn build(
831        config: actr_config::RuntimeConfig,
832        workload: crate::workload::Workload,
833        package_manifest: Option<actr_pack::PackageManifest>,
834        packaged_lock: Option<actr_config::lock::LockFile>,
835        mailbox_backpressure_threshold: usize,
836        credential_expiry_warning: Duration,
837    ) -> ActorResult<Self> {
838        use crate::outbound::{Gate, HostGate};
839        use crate::wire::webrtc::{ReconnectConfig, SignalingConfig, WebSocketSignalingClient};
840
841        tracing::info!("🚀 Initializing ActrNode");
842
843        // Initialize Mailbox
844        let mailbox_path = config
845            .mailbox_path
846            .as_ref()
847            .map(|p| p.to_string_lossy().to_string())
848            .unwrap_or_else(|| ":memory:".to_string());
849
850        tracing::info!("📂 Mailbox database path: {}", mailbox_path);
851
852        let mailbox: Arc<dyn actr_runtime_mailbox::Mailbox> = Arc::new(
853            actr_runtime_mailbox::SqliteMailbox::new(&mailbox_path)
854                .await
855                .map_err(|e| {
856                    actr_protocol::ActrError::Unavailable(format!("Mailbox init failed: {e}"))
857                })?,
858        );
859
860        // Initialize Dead Letter Queue
861        let dlq_path = if mailbox_path == ":memory:" {
862            ":memory:".to_string()
863        } else {
864            format!("{mailbox_path}.dlq")
865        };
866
867        let dlq: Arc<dyn actr_runtime_mailbox::DeadLetterQueue> = Arc::new(
868            actr_runtime_mailbox::SqliteDeadLetterQueue::new_standalone(&dlq_path)
869                .await
870                .map_err(|e| {
871                    actr_protocol::ActrError::Unavailable(format!("DLQ init failed: {e}"))
872                })?,
873        );
874        tracing::info!("✅ Dead Letter Queue initialized");
875
876        // Initialize signaling client
877        let webrtc_role = if config.webrtc.advanced.prefer_answerer() {
878            Some("answer".to_string())
879        } else {
880            None
881        };
882
883        let signaling_config = SignalingConfig {
884            server_url: config.signaling_url.clone(),
885            connection_timeout: 30,
886            heartbeat_interval: 30,
887            reconnect_config: ReconnectConfig::default(),
888            auth_config: None,
889            webrtc_role,
890        };
891
892        let client = Arc::new(WebSocketSignalingClient::new(signaling_config));
893        client.start_reconnect_manager();
894        let signaling_client: Arc<dyn crate::wire::webrtc::SignalingClient> = client;
895
896        // Initialize inproc infrastructure (Shell ↔ Guest)
897        let shell_to_workload = Arc::new(HostTransport::new());
898        let workload_to_shell = Arc::new(HostTransport::new());
899        let inproc_gate = Gate::Host(Arc::new(HostGate::new(shell_to_workload.clone())));
900
901        let data_stream_registry = Arc::new(DataStreamRegistry::new());
902        let media_frame_registry = Arc::new(MediaFrameRegistry::new());
903
904        tracing::info!("✅ Inproc infrastructure initialized (bidirectional Shell ↔ Guest)");
905
906        let actr_lock = if let Some(lock) = packaged_lock {
907            tracing::info!(
908                "📋 Loaded packaged manifest.lock.toml with {} dependencies",
909                lock.dependencies.len()
910            );
911            Some(Arc::new(lock))
912        } else {
913            tracing::warn!(
914                "⚠️ manifest.lock.toml not found in package. Continuing without dependency fingerprints."
915            );
916            None
917        };
918
919        tracing::info!("✅ ActrNode initialized");
920
921        Ok(Self {
922            config,
923            mailbox,
924            dlq,
925            inproc_gate,
926            outproc_gate: None, // Populated in start() once WebRTC / PeerGate is ready.
927            data_stream_registry,
928            media_frame_registry,
929            signaling_client,
930            actor_id: None,
931            credential_state: None,
932            webrtc_coordinator: None,
933            peer_transport: None,
934            webrtc_gate: None,
935            websocket_gate: None,
936            shell_to_workload: Some(shell_to_workload),
937            workload_to_shell: Some(workload_to_shell),
938            shutdown_token: CancellationToken::new(),
939            actr_lock,
940            network_event_rx: None,
941            network_event_debounce_config: None,
942            dedup_state: Arc::new(Mutex::new(DedupState::new())),
943            package_manifest,
944            preregistered_credential: None,
945            discovered_ws_addresses: Arc::new(tokio::sync::RwLock::new(
946                std::collections::HashMap::new(),
947            )),
948            workload_dispatch: Arc::new(Mutex::new(workload)),
949            hook_observer: None,
950            mailbox_backpressure_threshold,
951            credential_expiry_warning,
952        })
953    }
954
955    /// Snapshot the current runtime handles into a `BootstrapContextBuilder`.
956    ///
957    /// The returned builder is cloned into long-lived hook closures and into
958    /// `ActrRefShared` so those paths can materialize bootstrap contexts
959    /// without retaining a reference back to `Inner`. The snapshot freezes
960    /// `outproc_gate` and `actr_lock` at call time — callers that want to
961    /// observe a later-initialized `outproc_gate` must rebuild.
962    pub(crate) fn bootstrap_ctx_builder(&self) -> BootstrapContextBuilder {
963        BootstrapContextBuilder::new(
964            self.inproc_gate.clone(),
965            self.outproc_gate.clone(),
966            self.data_stream_registry.clone(),
967            self.media_frame_registry.clone(),
968            self.signaling_client.clone(),
969            self.actr_lock.clone(),
970            self.discovered_ws_addresses.clone(),
971        )
972    }
973
974    /// Build a `RuntimeContext` for the per-request dispatch path.
975    ///
976    /// Unlike `BootstrapContextBuilder::build_bootstrap`, this carries the
977    /// envelope's caller identity and request id through into the context.
978    pub(crate) fn make_runtime_context(
979        &self,
980        self_id: &ActrId,
981        caller_id: Option<&ActrId>,
982        request_id: &str,
983        credential: &AIdCredential,
984    ) -> RuntimeContext {
985        RuntimeContext::new(
986            self_id.clone(),
987            caller_id.cloned(),
988            request_id.to_string(),
989            self.inproc_gate.clone(),
990            self.outproc_gate.clone(),
991            self.data_stream_registry.clone(),
992            self.media_frame_registry.clone(),
993            self.signaling_client.clone(),
994            credential.clone(),
995            self.actr_lock.clone(),
996            self.discovered_ws_addresses.clone(),
997        )
998    }
999
1000    /// Create network event processing infrastructure (called on demand, before `start()`).
1001    ///
1002    /// # Parameters
1003    /// - `debounce_ms`: Debounce window in milliseconds. If 0, no debounce.
1004    ///
1005    /// # Panics
1006    /// Panics if called more than once.
1007    pub fn create_network_event_handle(
1008        &mut self,
1009        debounce_ms: u64,
1010    ) -> crate::lifecycle::NetworkEventHandle {
1011        if self.network_event_rx.is_some() {
1012            panic!("create_network_event_handle() can only be called once");
1013        }
1014
1015        let (event_tx, event_rx) = tokio::sync::mpsc::channel(100);
1016
1017        let debounce_config = if debounce_ms > 0 {
1018            Some(crate::lifecycle::network_event::DebounceConfig {
1019                window: std::time::Duration::from_millis(debounce_ms),
1020            })
1021        } else {
1022            None
1023        };
1024
1025        self.network_event_rx = Some(event_rx);
1026        self.network_event_debounce_config = debounce_config;
1027
1028        tracing::info!(
1029            debounce_ms,
1030            channel_capacity = 100_u64,
1031            "network_event.node.handle_created"
1032        );
1033
1034        crate::lifecycle::NetworkEventHandle::new(event_tx)
1035    }
1036
1037    /// Attach a credential already issued by AIS so that `start()` can skip
1038    /// the signaling registration step.
1039    ///
1040    /// Called by the Hyper layer between `Hyper::register()` and `Hyper::start()`.
1041    pub fn set_preregistered_credential(&mut self, register_ok: register_response::RegisterOk) {
1042        tracing::debug!("Pre-registered credential attached; start() will skip AIS registration");
1043        self.preregistered_credential = Some(register_ok);
1044    }
1045
1046    /// Start the system
1047    pub async fn start(mut self) -> ActorResult<ActrRef> {
1048        tracing::info!("🚀 Starting ActrNode");
1049        tracing::info!("Actr Rust version: {}", env!("CARGO_PKG_VERSION"));
1050
1051        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1052        // 1. Build RegisterRequest
1053        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1054        // Get ActrType from configuration
1055        let actr_type = self.config.actr_type().clone();
1056        tracing::info!("📋 Actor type: {}", actr_type);
1057
1058        // ServiceSpec is derived by the Hyper layer from the verified package
1059        // (see `service_spec::calculate_service_spec_from_package`). The raw
1060        // ActrNode::start() path has no package context and always sends None
1061        // on its own RegisterRequest; callers that need a spec must go
1062        // through `Hyper::register()`.
1063        let service_spec = None;
1064
1065        // If a WebSocket listen port is configured, build the advertised ws:// address
1066        // to register with the signaling server so clients can discover it.
1067        let ws_address = if let Some(port) = self.config.websocket_listen_port {
1068            let host = self
1069                .config
1070                .websocket_advertised_host
1071                .as_deref()
1072                .unwrap_or("127.0.0.1");
1073            Some(format!("ws://{}:{}", host, port))
1074        } else {
1075            None
1076        };
1077
1078        if let Some(ref addr) = ws_address {
1079            tracing::info!(
1080                "📡 Advertising WebSocket address to signaling server: {}",
1081                addr
1082            );
1083        }
1084
1085        let register_request = RegisterRequest {
1086            actr_type: actr_type.clone(),
1087            realm: self.config.realm,
1088            service_spec,
1089            acl: self.config.acl.clone(),
1090            service: None,
1091            ws_address,
1092            auth_mode: Some(RegisterAuthMode::Linked as i32),
1093            ..Default::default()
1094        };
1095
1096        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1097        // 1. Obtain registration info (Hyper pre-injected or AIS HTTP)
1098        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1099        let register_ok = if let Some(injected) = self.preregistered_credential.take() {
1100            tracing::info!(
1101                "Using Hyper pre-injected registration credential; skipping AIS registration"
1102            );
1103            injected
1104        } else {
1105            let ais_endpoint = &self.config.ais_endpoint;
1106            tracing::info!(
1107                ais_endpoint = %ais_endpoint,
1108                "Registering actor with AIS via HTTP"
1109            );
1110            let mut ais = AisClient::new(ais_endpoint);
1111            if let Some(ref secret) = self.config.realm_secret {
1112                ais = ais.with_realm_secret(secret);
1113            }
1114            let resp = ais
1115                .register_linked(register_request.clone())
1116                .await
1117                .map_err(|e| ActrError::Unavailable(format!("AIS registration failed: {e}")))?;
1118            match resp.result {
1119                Some(register_response::Result::Success(ok)) => {
1120                    tracing::info!("✅ AIS HTTP registration successful");
1121                    ok
1122                }
1123                Some(register_response::Result::Error(error)) => {
1124                    tracing::error!(
1125                        severity = 10,
1126                        error_category = "registration_error",
1127                        error_code = error.code,
1128                        "❌ AIS registration failed: code={}, message={}",
1129                        error.code,
1130                        error.message
1131                    );
1132                    return Err(ActrError::Unavailable(format!(
1133                        "AIS registration rejected: {} (code: {})",
1134                        error.message, error.code
1135                    )));
1136                }
1137                None => {
1138                    tracing::error!(
1139                        severity = 10,
1140                        error_category = "registration_error",
1141                        "❌ AIS registration response missing result"
1142                    );
1143                    return Err(ActrError::Unavailable(
1144                        "Invalid AIS registration response: missing result".to_string(),
1145                    ));
1146                }
1147            }
1148        };
1149
1150        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1151        // 3. Set credential on signaling client, then connect signaling WS
1152        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1153        // The signaling server requires credential params in the WS URL for
1154        // authentication. We must set actor_id + credential BEFORE connecting
1155        // so that build_url_with_identity() includes them in the query string.
1156        let pre_connect_credential_state = {
1157            let actor_id = register_ok.actr_id.clone();
1158            let credential_state = CredentialState::new(
1159                register_ok.credential.clone(),
1160                register_ok.credential_expires_at,
1161                Some(register_ok.turn_credential.clone()),
1162            );
1163            self.signaling_client.set_actor_id(actor_id).await;
1164            self.signaling_client
1165                .set_credential_state(credential_state.clone())
1166                .await;
1167            credential_state
1168        };
1169
1170        // Install the signaling-side hook callback so that
1171        // SignalingConnectStart / Connected / Disconnected events flow
1172        // through the framework tracing defaults and into a
1173        // user-installed observer. Done BEFORE connect() so the initial
1174        // attempt produces a SignalingConnectStart event.
1175        {
1176            let actor_id = register_ok.actr_id.clone();
1177            let credential_state = pre_connect_credential_state.clone();
1178            // Snapshot at this point — outproc_gate is still None here, so
1179            // signaling-event contexts will carry None for outproc_gate
1180            // (matching the pre-existing behavior prior to B13 refactor).
1181            let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1182            let ctx_builder: crate::lifecycle::hooks::HookContextBuilder = Arc::new(move || {
1183                let snapshot = ctx_builder_snapshot.clone();
1184                let actor_id = actor_id.clone();
1185                let credential_state = credential_state.clone();
1186                Box::pin(async move {
1187                    Some(snapshot.build_bootstrap(&actor_id, &credential_state.credential().await))
1188                })
1189            });
1190            let cb = crate::lifecycle::hooks::build_hook_callback(
1191                self.hook_observer.clone(),
1192                ctx_builder,
1193            );
1194            self.signaling_client.set_hook_callback(cb);
1195        }
1196
1197        tracing::info!("📡 Connecting to signaling server (with credential)");
1198        self.signaling_client
1199            .connect()
1200            .await
1201            .map_err(|e| ActrError::Unavailable(format!("Signaling connect failed: {e}")))?;
1202        tracing::info!("✅ Connected to signaling server");
1203
1204        // Collect background task handles so they can be managed by ActrRefShared later.
1205        let mut task_handles = Vec::new();
1206
1207        // Node-level hook callback, built inside the registration
1208        // setup block below and published back out into this wider
1209        // scope so the mailbox backpressure watchdog can subscribe.
1210        let node_hook_callback: Option<crate::wire::webrtc::HookCallback>;
1211
1212        {
1213            let actor_id = register_ok.actr_id;
1214            let credential = register_ok.credential;
1215
1216            tracing::info!("🆔 Assigned ActrId: {}", actor_id);
1217            tracing::info!("🔐 Received credential (key_id: {})", credential.key_id);
1218            tracing::info!(
1219                "💓 Signaling heartbeat interval: {} seconds",
1220                register_ok.signaling_heartbeat_interval_secs
1221            );
1222
1223            // TurnCredential is a required field; should always be present under normal registration.
1224            tracing::debug!("TurnCredential received, TURN authentication ready");
1225
1226            if let Some(expires_at) = &register_ok.credential_expires_at {
1227                tracing::debug!("⏰ Credential expires at: {}s", expires_at.seconds);
1228            }
1229
1230            // Store ActrId and credential state
1231            self.actor_id = Some(actor_id.clone());
1232            let credential_state = CredentialState::new(
1233                credential,
1234                register_ok.credential_expires_at,
1235                Some(register_ok.turn_credential.clone()),
1236            );
1237            self.credential_state = Some(credential_state.clone());
1238
1239            // Build the node-level lifecycle hook callback once: it is
1240            // reused for the initial `on_credential_renewed`, handed to
1241            // the heartbeat task for subsequent credential events, and
1242            // handed to the mailbox backpressure watchdog for
1243            // `on_mailbox_backpressure` on rising-edge crossings.
1244            //
1245            // The signaling layer already has its own callback installed
1246            // above — this second callback only carries credential and
1247            // mailbox-backpressure events, so no overlap with the
1248            // signaling-event plumbing.
1249            node_hook_callback =
1250                {
1251                    let actor_id_for_hook = actor_id.clone();
1252                    let credential_state_for_hook = credential_state.clone();
1253                    // Snapshot at this point — outproc_gate is still None
1254                    // here; credential / mailbox hook contexts inherit that
1255                    // and therefore cannot issue Dest::Actor(_) calls (same
1256                    // behavior as before B13 refactor).
1257                    let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1258                    let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1259                        Arc::new(move || {
1260                            let snapshot = ctx_builder_snapshot.clone();
1261                            let actor_id = actor_id_for_hook.clone();
1262                            let credential_state = credential_state_for_hook.clone();
1263                            Box::pin(async move {
1264                                Some(snapshot.build_bootstrap(
1265                                    &actor_id,
1266                                    &credential_state.credential().await,
1267                                ))
1268                            })
1269                        });
1270                    Some(crate::lifecycle::hooks::build_hook_callback(
1271                        self.hook_observer.clone(),
1272                        ctx_builder,
1273                    ))
1274                };
1275
1276            // Fire `on_credential_renewed` at initial registration: the
1277            // credential is considered "renewed" from "nothing" to the
1278            // value just issued by AIS. Subsequent renewals fire the
1279            // same hook from `lifecycle::heartbeat`.
1280            if let Some(expires_at) = &register_ok.credential_expires_at {
1281                let new_expiry = std::time::UNIX_EPOCH
1282                    + std::time::Duration::from_secs(expires_at.seconds.max(0) as u64);
1283                if let Some(cb) = node_hook_callback.as_ref() {
1284                    cb(crate::wire::webrtc::HookEvent::CredentialRenewed { new_expiry }).await;
1285                } else {
1286                    tracing::info!(new_expiry = ?new_expiry, "credential renewed");
1287                }
1288            }
1289
1290            // Note: actor_id and credential_state were already set on signaling_client
1291            // before connect (step 3 above), so reconnect URLs already carry correct auth.
1292
1293            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1294            // 1.3. Inproc transports were filled in during `build()`; nothing
1295            //      to stage here now that ContextFactory has been removed.
1296            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1297            tracing::info!("✅ Inproc infrastructure already ready (created in ActrNode::build())");
1298
1299            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1300            // 1.5. Create WebRTC infrastructure
1301            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1302            tracing::info!("🌐 Initializing WebRTC infrastructure");
1303
1304            let media_frame_registry = self.media_frame_registry.clone();
1305
1306            // Create WebRtcCoordinator
1307            let coordinator = Arc::new(crate::wire::webrtc::WebRtcCoordinator::new(
1308                actor_id.clone(),
1309                credential_state.clone(),
1310                self.signaling_client.clone(),
1311                self.config.webrtc.clone(),
1312                media_frame_registry,
1313            ));
1314
1315            // Install the WebRTC hook callback — fires
1316            // WebRtcConnectStart / Connected (with relayed info) /
1317            // Disconnected HookEvents on every peer state change.
1318            {
1319                let actor_id_for_hook = actor_id.clone();
1320                let credential_state_for_hook = credential_state.clone();
1321                // Snapshot before outproc_gate is wired up (just below). This
1322                // preserves the pre-refactor behavior where WebRTC-event
1323                // hook contexts carry outproc_gate = None.
1324                let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1325                let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1326                    Arc::new(move || {
1327                        let snapshot = ctx_builder_snapshot.clone();
1328                        let actor_id = actor_id_for_hook.clone();
1329                        let credential_state = credential_state_for_hook.clone();
1330                        Box::pin(async move {
1331                            Some(
1332                                snapshot.build_bootstrap(
1333                                    &actor_id,
1334                                    &credential_state.credential().await,
1335                                ),
1336                            )
1337                        })
1338                    });
1339                let cb = crate::lifecycle::hooks::build_hook_callback(
1340                    self.hook_observer.clone(),
1341                    ctx_builder,
1342                );
1343                coordinator.set_hook_callback(cb);
1344            }
1345
1346            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1347            // 1.6. Create PeerTransport + PeerGate (new architecture)
1348            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1349            tracing::info!("🏗️  Creating PeerTransport with WebRTC support");
1350
1351            // Pre-allocate the pending-requests map so it can be shared between
1352            // DefaultWireBuilder (for outbound WS response reader tasks) and
1353            // PeerGate (for request/response matching).
1354            let pending_requests: crate::outbound::PendingRequestsMap =
1355                Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new()));
1356
1357            // Create DefaultWireBuilder with WebRTC coordinator
1358            use crate::transport::{DefaultWireBuilder, DefaultWireBuilderConfig};
1359
1360            // WebSocket channel always enabled: target ws:// address is fully discovered at runtime
1361            // Direct-connect mode: encode local node ActrId as hex, sent as X-Actr-Node-Id
1362            let local_id_hex = hex::encode(actor_id.encode_to_vec());
1363            let wire_builder_config = DefaultWireBuilderConfig {
1364                local_id_hex,
1365                enable_webrtc: true,
1366                enable_websocket: true,
1367                // Share the discovered_ws_addresses map so that post-discovery calls
1368                // can use the signaling-provided ws:// URL for this actor node.
1369                discovered_ws_addresses: self.discovered_ws_addresses.clone(),
1370                // Pass credential_state so outbound WS handshake carries X-Actr-Credential,
1371                // enabling peer WebSocketGate to perform Ed25519 signature verification.
1372                credential_state: Some(credential_state.clone()),
1373                // Pass pending_requests so outbound WS connections spawn reader tasks
1374                // to deliver server responses back to `send_request_with_type` futures.
1375                pending_requests: Some(pending_requests.clone()),
1376            };
1377            let wire_builder = Arc::new(DefaultWireBuilder::new(
1378                Some(coordinator.clone()),
1379                wire_builder_config,
1380            ));
1381
1382            // Create PeerTransport
1383            use crate::transport::PeerTransport;
1384            let transport_manager = Arc::new(PeerTransport::new(actor_id.clone(), wire_builder));
1385            self.peer_transport = Some(transport_manager.clone());
1386
1387            // Create PeerGate with the pre-allocated pending_requests map and WebRTC coordinator.
1388            use crate::outbound::PeerGate;
1389            let outproc_gate = Arc::new(PeerGate::with_pending_requests(
1390                transport_manager,
1391                Some(coordinator.clone()),
1392                pending_requests.clone(),
1393            ));
1394            let outproc_gate_enum = Gate::Peer(outproc_gate.clone());
1395            tracing::info!("PeerTransport + PeerGate initialized");
1396
1397            let data_stream_registry = self.data_stream_registry.clone();
1398
1399            // Create WebRtcGate with shared pending_requests and DataStreamRegistry
1400            let gate = Arc::new(crate::wire::webrtc::gate::WebRtcGate::new(
1401                coordinator.clone(),
1402                pending_requests,
1403                data_stream_registry.clone(),
1404            ));
1405            // Set local_id
1406            gate.set_local_id(actor_id.clone()).await;
1407            tracing::info!(
1408                "✅ WebRtcGate created with shared pending_requests and DataStreamRegistry"
1409            );
1410
1411            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1412            // 1.7. Wire the outproc gate into Inner so subsequent
1413            //      `make_runtime_context` / `bootstrap_ctx_builder` calls
1414            //      observe it. All per-request contexts created by
1415            //      `handle_incoming` go through this field live.
1416            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1417            tracing::info!("🔧 Wiring outproc_gate into node");
1418            self.outproc_gate = Some(outproc_gate_enum);
1419            tracing::info!("✅ Node runtime gates fully initialized (inproc + outproc)");
1420
1421            // Save references
1422            self.webrtc_coordinator = Some(coordinator.clone());
1423            self.webrtc_gate = Some(gate.clone());
1424            tracing::info!("✅ WebRTC infrastructure initialized");
1425
1426            // Fire `on_start` once the runtime context can see the initialized
1427            // gates, before starting request-accepting/background loops. Its
1428            // Err/panic aborts Node::start.
1429            {
1430                let startup_ctx = self
1431                    .bootstrap_ctx_builder()
1432                    .build_bootstrap(&actor_id, &credential_state.credential().await);
1433                let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_start");
1434                let call_executor =
1435                    lifecycle_host_abi(startup_ctx.clone(), self.workload_dispatch.clone());
1436                let mut workload = self.workload_dispatch.lock().await;
1437                crate::lifecycle::hooks::call_lifecycle_hook(
1438                    "on_start",
1439                    workload.on_start(startup_ctx, invocation, &call_executor),
1440                )
1441                .await?;
1442            }
1443
1444            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1445            // 1.7.6. WebSocket Server (direct-connect mode, optional)
1446            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1447            if let Some(listen_port) = self.config.websocket_listen_port {
1448                tracing::info!(
1449                    "🔌 WebSocket direct-connect mode enabled, binding port {}",
1450                    listen_port
1451                );
1452                use crate::key_cache::AisKeyCache;
1453                use crate::wire::websocket::gate::WsAuthContext;
1454                use crate::wire::websocket::{WebSocketGate, WebSocketServer};
1455
1456                // Build AisKeyCache and seed it with the signing key from the registration response
1457                let ais_key_cache = AisKeyCache::new();
1458                if !register_ok.signing_pubkey.is_empty() {
1459                    match ais_key_cache
1460                        .seed(register_ok.signing_key_id, &register_ok.signing_pubkey)
1461                        .await
1462                    {
1463                        Ok(()) => tracing::info!(
1464                            key_id = register_ok.signing_key_id,
1465                            "🔑 AisKeyCache seeded from RegisterOk"
1466                        ),
1467                        Err(e) => tracing::warn!(
1468                            key_id = register_ok.signing_key_id,
1469                            error = ?e,
1470                            "AisKeyCache seed failed; WebSocket will reject all inbound connections"
1471                        ),
1472                    }
1473                } else {
1474                    tracing::warn!(
1475                        "RegisterOk missing signing_pubkey; WebSocket credential verification will degrade"
1476                    );
1477                }
1478
1479                let auth_ctx = WsAuthContext {
1480                    ais_key_cache,
1481                    actor_id: actor_id.clone(),
1482                    credential_state: credential_state.clone(),
1483                    signaling_client: self.signaling_client.clone(),
1484                };
1485
1486                match WebSocketServer::bind(listen_port).await {
1487                    Ok((ws_server, conn_rx)) => {
1488                        ws_server.start(self.shutdown_token.clone());
1489                        let ws_gate = Arc::new(WebSocketGate::new(
1490                            conn_rx,
1491                            outproc_gate.get_pending_requests(),
1492                            data_stream_registry.clone(),
1493                            Some(auth_ctx),
1494                        ));
1495
1496                        // Install the WebSocket peer-lifecycle hook.
1497                        {
1498                            let actor_id_for_hook = actor_id.clone();
1499                            let credential_state_for_hook = credential_state.clone();
1500                            // Snapshot taken after outproc_gate is live: ws
1501                            // peer-lifecycle hook contexts can issue
1502                            // Dest::Actor(_) calls.
1503                            let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1504                            let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1505                                Arc::new(move || {
1506                                    let snapshot = ctx_builder_snapshot.clone();
1507                                    let actor_id = actor_id_for_hook.clone();
1508                                    let credential_state = credential_state_for_hook.clone();
1509                                    Box::pin(async move {
1510                                        Some(snapshot.build_bootstrap(
1511                                            &actor_id,
1512                                            &credential_state.credential().await,
1513                                        ))
1514                                    })
1515                                });
1516                            let cb = crate::lifecycle::hooks::build_hook_callback(
1517                                self.hook_observer.clone(),
1518                                ctx_builder,
1519                            );
1520                            ws_gate.set_hook_callback(cb);
1521                        }
1522
1523                        self.websocket_gate = Some(ws_gate);
1524                        tracing::info!(
1525                            "✅ WebSocketServer + WebSocketGate initialized (credential auth enabled)"
1526                        );
1527                    }
1528                    Err(e) => {
1529                        tracing::error!(
1530                            "❌ Failed to bind WebSocket server on port {}: {:?}",
1531                            listen_port,
1532                            e
1533                        );
1534                    }
1535                }
1536            }
1537
1538            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1539            // 1.7.5. Create shared state for credential management
1540            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1541            // Shared credential state initialized above; reused across tasks
1542
1543            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1544            // 1.8. Spawn heartbeat task (periodic Ping to signaling server)
1545            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1546            {
1547                let shutdown = self.shutdown_token.clone();
1548                let client = self.signaling_client.clone();
1549                let actor_id_for_heartbeat = actor_id.clone();
1550                let credential_state_for_heartbeat = credential_state.clone();
1551                let mailbox_for_heartbeat = self.mailbox.clone();
1552                let register_request_for_heartbeat = register_request.clone();
1553                let webrtc_coordinator_for_heartbeat = self.webrtc_coordinator.clone();
1554                let webrtc_gate_for_heartbeat = self.webrtc_gate.clone();
1555
1556                // Use interval from registration response, default to 30s
1557                let heartbeat_interval_secs = register_ok.signaling_heartbeat_interval_secs;
1558                let heartbeat_interval = if heartbeat_interval_secs > 0 {
1559                    Duration::from_secs(heartbeat_interval_secs as u64)
1560                } else {
1561                    Duration::from_secs(30)
1562                };
1563                let ais_endpoint_for_heartbeat = self.config.ais_endpoint.clone();
1564                let realm_secret_for_heartbeat = self.config.realm_secret.clone();
1565                let heartbeat_handle = tokio::spawn(crate::lifecycle::heartbeat::heartbeat_task(
1566                    shutdown,
1567                    client,
1568                    actor_id_for_heartbeat,
1569                    credential_state_for_heartbeat,
1570                    mailbox_for_heartbeat,
1571                    heartbeat_interval,
1572                    register_request_for_heartbeat,
1573                    ais_endpoint_for_heartbeat,
1574                    realm_secret_for_heartbeat,
1575                    node_hook_callback.clone(),
1576                    webrtc_coordinator_for_heartbeat,
1577                    webrtc_gate_for_heartbeat,
1578                ));
1579                task_handles.push(heartbeat_handle);
1580            }
1581            tracing::info!(
1582                "✅ Heartbeat task started (interval: {}s)",
1583                register_ok.signaling_heartbeat_interval_secs
1584            );
1585
1586            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1587            // 1.8.5. Spawn network event processing loop
1588            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1589            if let Some(event_rx) = self.network_event_rx.take() {
1590                use crate::lifecycle::network_event::DefaultNetworkEventProcessor;
1591
1592                // Create DefaultNetworkEventProcessor
1593                // If debounce config exists, use new_with_debounce
1594                let event_processor =
1595                    if let Some(config) = self.network_event_debounce_config.clone() {
1596                        Arc::new(
1597                            DefaultNetworkEventProcessor::new_with_debounce_and_peer_transport(
1598                                self.signaling_client.clone(),
1599                                self.webrtc_coordinator.clone(),
1600                                config,
1601                                self.peer_transport.clone(),
1602                            ),
1603                        )
1604                    } else {
1605                        Arc::new(DefaultNetworkEventProcessor::new_with_peer_transport(
1606                            self.signaling_client.clone(),
1607                            self.webrtc_coordinator.clone(),
1608                            self.peer_transport.clone(),
1609                        ))
1610                    };
1611
1612                let shutdown = self.shutdown_token.clone();
1613                let network_event_handle = tokio::spawn(async move {
1614                    Self::network_event_loop(event_rx, event_processor, shutdown).await;
1615                });
1616                task_handles.push(network_event_handle);
1617                tracing::info!("network_event.node.loop_started");
1618            } else {
1619                tracing::debug!("network_event.node.loop_not_started_no_handle");
1620            }
1621
1622            {
1623                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1624                // 1.9. Spawn dedicated Unregister task (best-effort, with timeout)
1625                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1626                //
1627                // This task:
1628                // - Waits for shutdown_token to be cancelled (e.g., wait_for_ctrl_c_and_shutdown)
1629                // - Then sends UnregisterRequest via signaling client with a timeout
1630                //
1631                // NOTE: we push its JoinHandle into task_handles so it can be aborted
1632                // by ActrRefShared::Drop if needed.
1633                let shutdown = self.shutdown_token.clone();
1634                let client = self.signaling_client.clone();
1635                let actor_id_for_unreg = actor_id.clone();
1636                let credential_state_for_unreg = credential_state.clone();
1637                let webrtc_coordinator = self.webrtc_coordinator.clone();
1638
1639                let unregister_handle = tokio::spawn(async move {
1640                    // Wait for shutdown signal
1641                    shutdown.cancelled().await;
1642                    tracing::info!(
1643                        "📡 Shutdown signal received, sending UnregisterRequest for Actor {}",
1644                        actor_id_for_unreg
1645                    );
1646
1647                    // 1. Close all WebRTC peer connections first (if any)
1648                    if let Some(coord) = webrtc_coordinator {
1649                        if let Err(e) = coord.close_all_peers().await {
1650                            tracing::warn!(
1651                                "⚠️ Failed to close all WebRTC peers before UnregisterRequest: {}",
1652                                e
1653                            );
1654                        } else {
1655                            tracing::info!("✅ All WebRTC peers closed before UnregisterRequest");
1656                        }
1657                    } else {
1658                        tracing::debug!(
1659                            "WebRTC coordinator not found before UnregisterRequest (no WebRTC?)"
1660                        );
1661                    }
1662
1663                    // 2. Then send UnregisterRequest with a timeout (e.g. 5 seconds)
1664                    let result = tokio::time::timeout(
1665                        Duration::from_secs(5),
1666                        client.send_unregister_request(
1667                            actor_id_for_unreg.clone(),
1668                            credential_state_for_unreg.credential().await,
1669                            Some("Graceful shutdown".to_string()),
1670                        ),
1671                    )
1672                    .await;
1673                    tracing::info!("UnregisterRequest result: {:?}", result);
1674                    match result {
1675                        Ok(Ok(_)) => {
1676                            tracing::info!(
1677                                "✅ UnregisterRequest sent to signaling server for Actor {}",
1678                                actor_id_for_unreg
1679                            );
1680                        }
1681                        Ok(Err(e)) => {
1682                            tracing::warn!(
1683                                "⚠️ Failed to send UnregisterRequest for Actor {}: {}",
1684                                actor_id_for_unreg,
1685                                e
1686                            );
1687                        }
1688                        Err(_) => {
1689                            tracing::warn!(
1690                                "⚠️ UnregisterRequest timeout (5s) for Actor {}",
1691                                actor_id_for_unreg
1692                            );
1693                        }
1694                    }
1695                });
1696
1697                task_handles.push(unregister_handle);
1698            }
1699        } // end registration setup block
1700
1701        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1702        // 2. Transport layer initialization (completed via WebRTC infrastructure)
1703        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1704        tracing::info!("✅ Transport layer initialized via WebRTC infrastructure");
1705
1706        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1707        // 3.1 Convert to Arc (before starting background loops)
1708        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1709        // Clone actor_id before moving self into Arc
1710        let actor_id = self
1711            .actor_id
1712            .as_ref()
1713            .ok_or_else(|| ActrError::Internal("Actor ID not set".to_string()))?
1714            .clone();
1715        // Snapshot now that outproc_gate has been wired above; this builder
1716        // is shared between on_start / on_stop hooks and the ActrRefShared
1717        // handle returned to the caller.
1718        let bootstrap_ctx_builder = self.bootstrap_ctx_builder();
1719        let credential_state = self
1720            .credential_state
1721            .clone()
1722            .expect("CredentialState must be initialized in start()");
1723        let shutdown_token = self.shutdown_token.clone();
1724        let node_ref = Arc::new(self);
1725
1726        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1727        // 3.2. Register workload-level stop hook.
1728        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1729        {
1730            let node = node_ref.clone();
1731            let actor_id = actor_id.clone();
1732            let credential_state = credential_state.clone();
1733            let shutdown = shutdown_token.clone();
1734            let on_stop_handle = tokio::spawn(async move {
1735                shutdown.cancelled().await;
1736                let stop_ctx = node
1737                    .bootstrap_ctx_builder()
1738                    .build_bootstrap(&actor_id, &credential_state.credential().await);
1739                let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_stop");
1740                let call_executor =
1741                    lifecycle_host_abi(stop_ctx.clone(), node.workload_dispatch.clone());
1742                let mut workload = node.workload_dispatch.lock().await;
1743                if let Err(e) = crate::lifecycle::hooks::call_lifecycle_hook(
1744                    "on_stop",
1745                    workload.on_stop(stop_ctx, invocation, &call_executor),
1746                )
1747                .await
1748                {
1749                    tracing::warn!(error = %e, "workload on_stop returned Err");
1750                }
1751            });
1752            task_handles.push(on_stop_handle);
1753        }
1754
1755        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1756        // 3.5. Start WebRTC background loops
1757        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1758        tracing::info!("🚀 Starting WebRTC background loops");
1759
1760        // Start WebRtcCoordinator signaling loop
1761        if let Some(coordinator) = &node_ref.webrtc_coordinator {
1762            coordinator.clone().start().await.map_err(|e| {
1763                ActrError::Unavailable(format!("WebRtcCoordinator start failed: {e}"))
1764            })?;
1765            tracing::info!("✅ WebRtcCoordinator signaling loop started");
1766        }
1767
1768        // Start WebRtcGate message receive loop (route to Mailbox)
1769        if let Some(gate) = &node_ref.webrtc_gate {
1770            gate.start_receive_loop(node_ref.mailbox.clone())
1771                .await
1772                .map_err(|e| {
1773                    ActrError::Unavailable(format!("WebRtcGate receive loop start failed: {e}"))
1774                })?;
1775            tracing::info!("✅ WebRtcGate → Mailbox routing started");
1776        }
1777
1778        // Start WebSocketGate message receive loop (route to Mailbox, direct-connect mode)
1779        if let Some(ws_gate) = &node_ref.websocket_gate {
1780            ws_gate
1781                .start_receive_loop(node_ref.mailbox.clone())
1782                .await
1783                .map_err(|e| {
1784                    ActrError::Unavailable(format!("WebSocketGate receive loop start failed: {e}"))
1785                })?;
1786            tracing::info!("✅ WebSocketGate → Mailbox routing started");
1787        }
1788        tracing::info!("✅ WebRTC background loops started");
1789
1790        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1791        // 4.6. Start Inproc receive loop (Shell → Guest)
1792        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1793        if let Some(shell_to_workload) = &node_ref.shell_to_workload {
1794            tracing::info!("🔄 Starting Inproc receive loop (Shell → Guest)");
1795            // Start Guest receive loop (Shell → Guest REQUEST)
1796            if let Some(workload_to_shell) = &node_ref.workload_to_shell {
1797                let node = node_ref.clone();
1798                let request_rx_lane = shell_to_workload
1799                    .get_lane(PayloadType::RpcReliable, None)
1800                    .await
1801                    .map_err(|e| {
1802                        ActrError::Unavailable(format!("Failed to get guest receive lane: {e}"))
1803                    })?;
1804                let response_tx = workload_to_shell.clone();
1805                let shutdown = shutdown_token.clone();
1806
1807                let inproc_handle = tokio::spawn(async move {
1808                    loop {
1809                        tokio::select! {
1810                            _ = shutdown.cancelled() => {
1811                                tracing::info!("📭 Guest receive loop (Shell → Guest) received shutdown signal");
1812                                break;
1813                            }
1814                            envelope_result = request_rx_lane.recv_envelope() => {
1815                                match envelope_result {
1816                                    Ok(envelope) => {
1817                                        let request_id = envelope.request_id.clone();
1818                                        tracing::debug!("📨 Guest received REQUEST from Shell: request_id={}", request_id);
1819                                        // Extract and set tracing context from envelope
1820                                        #[cfg(feature = "opentelemetry")]
1821                                        let span = {
1822                                            let actr_id_str = node.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default();
1823                                            let span = tracing::info_span!("ActrNode.lane_receive", actr_id = %actr_id_str, request_id = %request_id);
1824                                            set_parent_from_rpc_envelope(&span, &envelope);
1825                                            span
1826                                        };
1827
1828                                        // Shell calls have no caller_id (local process communication)
1829                                        let handle_incoming_fut = node.handle_incoming(envelope.clone(), None);
1830                                        #[cfg(feature = "opentelemetry")]
1831                                        let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
1832
1833                                        match handle_incoming_fut.await {
1834                                            Ok(response_bytes) => {
1835                                                // Send RESPONSE back via workload_to_shell
1836                                                // Keep same route_key (no prefix needed - separate channels!)
1837                                                #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1838                                                let mut response_envelope = RpcEnvelope {
1839                                                    route_key: envelope.route_key.clone(),
1840                                                    payload: Some(response_bytes),
1841                                                    error: None,
1842                                                    traceparent: None,
1843                                                    tracestate: None,
1844                                                    request_id: request_id.clone(),
1845                                                    metadata: Vec::new(),
1846                                                    timeout_ms: 30000,
1847                                                };
1848                                                // Inject tracing context
1849                                                #[cfg(feature = "opentelemetry")]
1850                                                inject_span_context_to_rpc(&span, &mut response_envelope);
1851
1852                                                // Send via Guest → Shell channel
1853                                                let send_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, response_envelope);
1854                                                #[cfg(feature = "opentelemetry")]
1855                                                let send_response_fut = send_response_fut.instrument(span.clone());
1856                                                if let Err(e) = send_response_fut.await {
1857                                                    tracing::error!(
1858                                                        severity = 7,
1859                                                        error_category = "transport_error",
1860                                                        request_id = %request_id,
1861                                                        "❌ Failed to send RESPONSE to Shell: {:?}",
1862                                                        e
1863                                                    );
1864                                                }
1865                                            }
1866                                            Err(e) => {
1867                                                tracing::error!(
1868                                                    severity = 6,
1869                                                    error_category = "handler_error",
1870                                                    request_id = %request_id,
1871                                                    route_key = %envelope.route_key,
1872                                                    "❌ Guest message handling failed: {:?}",
1873                                                    e
1874                                                );
1875
1876                                                // Send error response (system-level error on envelope)
1877                                                let error_response = actr_protocol::ErrorResponse {
1878                                                    code: protocol_error_to_code(&e),
1879                                                    message: e.to_string(),
1880                                                };
1881                                                #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1882                                                let mut error_envelope = RpcEnvelope {
1883                                                    route_key: envelope.route_key.clone(),
1884                                                    payload: None,
1885                                                    error: Some(error_response),
1886                                                    traceparent: envelope.traceparent.clone(),
1887                                                    tracestate: envelope.tracestate.clone(),
1888                                                    request_id: request_id.clone(),
1889                                                    metadata: Vec::new(),
1890                                                    timeout_ms: 30000,
1891                                                };
1892                                                // Inject tracing context
1893                                                #[cfg(feature = "opentelemetry")]
1894                                                inject_span_context_to_rpc(&span, &mut error_envelope);
1895
1896                                                let send_error_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, error_envelope);
1897                                                #[cfg(feature = "opentelemetry")]
1898                                                let send_error_response_fut = send_error_response_fut.instrument(span);
1899                                                if let Err(send_err) = send_error_response_fut.await {
1900                                                    tracing::error!(
1901                                                        severity = 7,
1902                                                        error_category = "transport_error",
1903                                                        request_id = %request_id,
1904                                                        "❌ Failed to send ERROR response to Shell: {:?}",
1905                                                        send_err
1906                                                    );
1907                                                }
1908                                            }
1909                                        }
1910                                    }
1911                                    Err(e) => {
1912                                        tracing::error!(
1913                                            severity = 8,
1914                                            error_category = "transport_error",
1915                                            "❌ Failed to receive from Shell → Guest lane: {:?}",
1916                                            e
1917                                        );
1918                                        break;
1919                                    }
1920                                }
1921                            }
1922                        }
1923                    }
1924                    tracing::info!("✅ Guest receive loop (Shell → Guest) terminated gracefully");
1925                });
1926                task_handles.push(inproc_handle);
1927            }
1928        }
1929        tracing::info!("✅ Guest receive loop (Shell → Guest REQUEST) started");
1930
1931        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1932        // 4.7. Start Shell receive loop (Guest → Shell RESPONSE)
1933        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1934        tracing::info!("🔄 Starting Shell receive loop (Guest → Shell RESPONSE)");
1935        if let Some(workload_to_shell) = &node_ref.workload_to_shell {
1936            // Start Shell receive loop (Guest → Shell RESPONSE)
1937            if let Some(shell_to_workload) = &node_ref.shell_to_workload {
1938                let response_rx_lane = workload_to_shell
1939                    .get_lane(PayloadType::RpcReliable, None)
1940                    .await
1941                    .map_err(|e| {
1942                        ActrError::Unavailable(format!("Failed to get shell receive lane: {e}"))
1943                    })?;
1944                let request_mgr = shell_to_workload.clone();
1945                let shutdown = shutdown_token.clone();
1946
1947                let shell_receive_handle = tokio::spawn(async move {
1948                    loop {
1949                        tokio::select! {
1950                            _ = shutdown.cancelled() => {
1951                                tracing::info!("📭 Shell receive loop (Guest → Shell) received shutdown signal");
1952                                break;
1953                            }
1954                            envelope_result = response_rx_lane.recv_envelope() => {
1955                                match envelope_result {
1956                                    Ok(envelope) => {
1957                                        tracing::debug!(
1958                                            "📨 Shell received RESPONSE from Guest: request_id={}",
1959                                            envelope.request_id
1960                                        );
1961
1962                                        // Check if response is success or error
1963                                        match (envelope.payload, envelope.error) {
1964                                            (Some(payload), None) => {
1965                                                // Success response
1966                                                if let Err(e) = request_mgr
1967                                                    .complete_response(&envelope.request_id, payload)
1968                                                    .await
1969                                                {
1970                                                    tracing::warn!(
1971                                                        severity = 4,
1972                                                        error_category = "orphan_response",
1973                                                        request_id = %envelope.request_id,
1974                                                        "⚠️  No pending request found for response: {:?}",
1975                                                        e
1976                                                    );
1977                                                }
1978                                            }
1979                                            (None, Some(error)) => {
1980                                                // Error response — reconstruct the precise ActrError variant
1981                                                // from the wire code so binding-visible classification
1982                                                // (UnknownRoute / PermissionDenied / TimedOut / …) is preserved
1983                                                // instead of collapsing every error into Unavailable.
1984                                                let actr_err = wire_code_to_actr_error(error.code, error.message);
1985                                                if let Err(e) = request_mgr
1986                                                    .complete_error(&envelope.request_id, actr_err)
1987                                                    .await
1988                                                {
1989                                                    tracing::warn!(
1990                                                        severity = 4,
1991                                                        error_category = "orphan_response",
1992                                                        request_id = %envelope.request_id,
1993                                                        "⚠️  No pending request found for error response: {:?}",
1994                                                        e
1995                                                    );
1996                                                }
1997                                            }
1998                                            _ => {
1999                                                tracing::error!(
2000                                                    severity = 7,
2001                                                    error_category = "protocol_error",
2002                                                    request_id = %envelope.request_id,
2003                                                    "❌ Invalid RpcEnvelope: both payload and error are present or both absent"
2004                                                );
2005                                            }
2006                                        }
2007                                    }
2008                                    Err(e) => {
2009                                        tracing::error!(
2010                                            severity = 8,
2011                                            error_category = "transport_error",
2012                                            "❌ Failed to receive from Guest → Shell lane: {:?}",
2013                                            e
2014                                        );
2015                                        break;
2016                                    }
2017                                }
2018                            }
2019                        }
2020                    }
2021                    tracing::info!("✅ Shell receive loop (Guest → Shell) terminated gracefully");
2022                });
2023                task_handles.push(shell_receive_handle);
2024            }
2025        }
2026        tracing::info!("✅ Shell receive loop (Guest → Shell RESPONSE) started");
2027
2028        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2029        // 4.9. Mailbox backpressure watchdog
2030        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2031        //
2032        // Emits the framework `on_mailbox_backpressure` hook once per
2033        // rising-edge crossing of the configured threshold.
2034        //
2035        // Preferred path: a push-based notification from the mailbox
2036        // backend via [`Mailbox::set_depth_observer`], which runs
2037        // synchronously on every enqueue and has zero worst-case delay.
2038        //
2039        // Fallback path: mailbox backends without depth support (or
2040        // which can't cheaply compute depth on every enqueue) keep
2041        // using a 1 Hz poll of [`Mailbox::status`].
2042        let backpressure_threshold = node_ref.mailbox_backpressure_threshold;
2043        {
2044            use std::sync::atomic::{AtomicBool, Ordering};
2045            let mailbox = node_ref.mailbox.clone();
2046            let shutdown = shutdown_token.clone();
2047            let hook_cb = node_hook_callback.clone();
2048            let triggered = Arc::new(AtomicBool::new(false));
2049
2050            // Shared rising-edge state + hook-firing closure used by
2051            // both the push and polling code paths.
2052            let fire_if_rising = {
2053                let triggered = triggered.clone();
2054                let hook_cb = hook_cb.clone();
2055                Arc::new(move |queue_len: usize| {
2056                    if queue_len >= backpressure_threshold {
2057                        if !triggered.swap(true, Ordering::AcqRel) {
2058                            if let Some(cb) = hook_cb.as_ref() {
2059                                let cb = cb.clone();
2060                                tokio::spawn(async move {
2061                                    cb(crate::wire::webrtc::HookEvent::MailboxBackpressure {
2062                                        queue_len,
2063                                        threshold: backpressure_threshold,
2064                                    })
2065                                    .await;
2066                                });
2067                            } else {
2068                                tracing::warn!(
2069                                    queue_len,
2070                                    threshold = backpressure_threshold,
2071                                    "mailbox backpressure",
2072                                );
2073                            }
2074                        }
2075                    } else if triggered.swap(false, Ordering::AcqRel) {
2076                        tracing::info!(
2077                            queue_len,
2078                            threshold = backpressure_threshold,
2079                            "mailbox backpressure cleared",
2080                        );
2081                    }
2082                })
2083            };
2084
2085            // Try the push path first. The observer installs only if
2086            // the backend supports it; otherwise `installed` is `false`
2087            // and we fall through to polling.
2088            struct EnqueueObserver {
2089                fire: Arc<dyn Fn(usize) + Send + Sync + 'static>,
2090            }
2091            impl actr_runtime_mailbox::MailboxDepthObserver for EnqueueObserver {
2092                fn on_depth_change(&self, queued_messages: usize) {
2093                    (self.fire)(queued_messages);
2094                }
2095            }
2096
2097            let installed = {
2098                let observer: Arc<dyn actr_runtime_mailbox::MailboxDepthObserver> =
2099                    Arc::new(EnqueueObserver {
2100                        fire: fire_if_rising.clone(),
2101                    });
2102                mailbox.set_depth_observer(observer)
2103            };
2104
2105            if installed {
2106                tracing::debug!("mailbox backpressure watchdog: push notifications enabled");
2107            } else {
2108                tracing::debug!(
2109                    "mailbox backpressure watchdog: backend does not support push, falling back to 1 Hz polling"
2110                );
2111                let mailbox_for_poll = mailbox.clone();
2112                let shutdown_for_poll = shutdown.clone();
2113                let fire_for_poll = fire_if_rising.clone();
2114                let watchdog_handle = tokio::spawn(async move {
2115                    let mut ticker = tokio::time::interval(Duration::from_secs(1));
2116                    ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
2117                    loop {
2118                        tokio::select! {
2119                            _ = shutdown_for_poll.cancelled() => {
2120                                tracing::debug!(
2121                                    "mailbox backpressure watchdog shutting down"
2122                                );
2123                                break;
2124                            }
2125                            _ = ticker.tick() => {
2126                                let status = match mailbox_for_poll.status().await {
2127                                    Ok(s) => s,
2128                                    Err(e) => {
2129                                        tracing::debug!(?e, "mailbox status poll failed");
2130                                        continue;
2131                                    }
2132                                };
2133                                fire_for_poll(status.queued_messages as usize);
2134                            }
2135                        }
2136                    }
2137                });
2138                task_handles.push(watchdog_handle);
2139            }
2140        }
2141
2142        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2143        // 5. Start Mailbox processing loop (State Path)
2144        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2145        tracing::info!("🔄 Starting Mailbox processing loop (State Path)");
2146        {
2147            let node = node_ref.clone();
2148            let mailbox = node_ref.mailbox.clone();
2149            let webrtc_gate = node_ref.webrtc_gate.clone();
2150            let ws_gate = node_ref.websocket_gate.clone();
2151            let shutdown = shutdown_token.clone();
2152
2153            let mailbox_handle = tokio::spawn(async move {
2154                loop {
2155                    tokio::select! {
2156                        // Listen for shutdown signal
2157                        _ = shutdown.cancelled() => {
2158                            tracing::info!("📭 Mailbox loop received shutdown signal");
2159                            break;
2160                        }
2161                        // Dequeue messages (by priority)
2162                        result = mailbox.dequeue() => {
2163                            match result {
2164                                Ok(messages) => {
2165                                    if messages.is_empty() {
2166                                        // Queue empty, sleep briefly
2167                                        tokio::time::sleep(Duration::from_millis(10)).await;
2168                                        continue;
2169                                    }
2170                                    tracing::debug!("📬 Mailbox dequeue: {} messages", messages.len());
2171
2172                                    // Process messages one by one
2173                                    for msg_record in messages {
2174                                        // Deserialize RpcEnvelope (Protobuf)
2175                                        match RpcEnvelope::decode(&msg_record.payload[..]) {
2176                                            Ok(envelope) => {
2177                                                let request_id = envelope.request_id.clone();
2178                                                let queue_latency_ms = (chrono::Utc::now() - msg_record.created_at).num_milliseconds();
2179                                                tracing::info!(request_id = %request_id, queue_latency_ms = queue_latency_ms, "rpc.mailbox.dequeued");
2180
2181                                                tracing::debug!("📦 Processing message: request_id={}", request_id);
2182                                                #[cfg(feature = "opentelemetry")]
2183                                                let span = {
2184                                                    let actr_id_str = node.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default();
2185                                                    let span = tracing::info_span!("ActrNode.mailbox_receive", actr_id = %actr_id_str, request_id = %request_id, queue_wait_ms = queue_latency_ms);
2186                                                    set_parent_from_rpc_envelope(&span, &envelope);
2187                                                    span
2188                                                };
2189
2190                                                // Decode caller_id from MessageRecord.from (transport layer)
2191                                                let caller_id_result = ActrId::decode(&msg_record.from[..]);
2192                                                let caller_id_ref = caller_id_result.as_ref().ok();
2193
2194                                                if caller_id_ref.is_none() {
2195                                                    tracing::warn!(
2196                                                        request_id = %request_id,
2197                                                        "⚠️  Failed to decode caller_id from MessageRecord.from"
2198                                                    );
2199                                                }
2200
2201                                                // Call handle_incoming with caller_id from transport layer
2202                                                let handle_incoming_fut = node.handle_incoming(envelope.clone(), caller_id_ref);
2203                                                #[cfg(feature = "opentelemetry")]
2204                                                let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
2205
2206                                                /// Send `response_envelope` back to `caller` via the
2207                                                /// best available transport.
2208                                                ///
2209                                                /// Priority: inbound WebSocket connection (if caller
2210                                                /// dialled us directly) → WebRTC gate.  Returns the
2211                                                /// first transport error encountered, if any.
2212                                                async fn send_envelope_to_caller(
2213                                                    ws_gate: &Option<Arc<crate::wire::websocket::WebSocketGate>>,
2214                                                    webrtc_gate: &Option<Arc<crate::wire::webrtc::gate::WebRtcGate>>,
2215                                                    caller: &ActrId,
2216                                                    response_envelope: RpcEnvelope,
2217                                                    request_id: &str,
2218                                                ) {
2219                                                    // 1. Try inbound WebSocket connection first.
2220                                                    if let Some(wsg) = ws_gate {
2221                                                        match wsg.send_response(caller, response_envelope.clone()).await {
2222                                                            Ok(true) => return, // sent successfully
2223                                                            Ok(false) => {
2224                                                                tracing::debug!(
2225                                                                    request_id = request_id,
2226                                                                    caller = %caller,
2227                                                                    "No inbound WS connection for caller; falling back to WebRTC gate"
2228                                                                );
2229                                                            }
2230                                                            Err(e) => {
2231                                                                tracing::warn!(
2232                                                                    severity = 5,
2233                                                                    error_category = "transport_error",
2234                                                                    request_id = request_id,
2235                                                                    "WebSocketGate send_response failed, falling back: {:?}", e
2236                                                                );
2237                                                            }
2238                                                        }
2239                                                    }
2240
2241                                                    // 2. Fall back to WebRTC gate.
2242                                                    if let Some(gate) = webrtc_gate {
2243                                                        if let Err(e) = gate.send_response(caller, response_envelope).await {
2244                                                            tracing::error!(
2245                                                                severity = 7,
2246                                                                error_category = "transport_error",
2247                                                                request_id = request_id,
2248                                                                "❌ WebRtcGate send_response failed: {:?}", e
2249                                                            );
2250                                                        }
2251                                                    } else {
2252                                                        tracing::error!(
2253                                                            severity = 7,
2254                                                            error_category = "transport_error",
2255                                                            request_id = request_id,
2256                                                            "❌ No gate available to send response"
2257                                                        );
2258                                                    }
2259                                                }
2260
2261                                                match handle_incoming_fut.await {
2262                                                    Ok(response_bytes) => {
2263                                                        match caller_id_result {
2264                                                            Ok(caller) => {
2265                                                                // Construct response RpcEnvelope (reuse request_id!)
2266                                                                #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
2267                                                                let mut response_envelope = RpcEnvelope {
2268                                                                    request_id: request_id.clone(),
2269                                                                    route_key: envelope.route_key.clone(),
2270                                                                    payload: Some(response_bytes),
2271                                                                    error: None,
2272                                                                    traceparent: envelope.traceparent.clone(),
2273                                                                    tracestate: envelope.tracestate.clone(),
2274                                                                    metadata: Vec::new(),
2275                                                                    timeout_ms: 30000,
2276                                                                };
2277                                                                // Inject tracing context
2278                                                                #[cfg(feature = "opentelemetry")]
2279                                                                inject_span_context_to_rpc(&span, &mut response_envelope);
2280
2281                                                                #[cfg(feature = "opentelemetry")]
2282                                                                let send_fut = send_envelope_to_caller(
2283                                                                    &ws_gate,
2284                                                                    &webrtc_gate,
2285                                                                    &caller,
2286                                                                    response_envelope,
2287                                                                    &request_id,
2288                                                                ).instrument(span);
2289                                                                #[cfg(not(feature = "opentelemetry"))]
2290                                                                let send_fut = send_envelope_to_caller(
2291                                                                    &ws_gate,
2292                                                                    &webrtc_gate,
2293                                                                    &caller,
2294                                                                    response_envelope,
2295                                                                    &request_id,
2296                                                                );
2297                                                                send_fut.await;
2298                                                            }
2299                                                            Err(e) => {
2300                                                                tracing::error!(
2301                                                                    severity = 8,
2302                                                                    error_category = "protobuf_decode",
2303                                                                    request_id = %envelope.request_id,
2304                                                                    "❌ Failed to decode caller_id: {:?}",
2305                                                                    e
2306                                                                );
2307                                                            }
2308                                                        }
2309
2310                                                        // ACK message
2311                                                        if let Err(e) = mailbox.ack(msg_record.id).await {
2312                                                            tracing::error!(
2313                                                                severity = 9,
2314                                                                error_category = "mailbox_error",
2315                                                                request_id = %envelope.request_id,
2316                                                                message_id = %msg_record.id,
2317                                                                "❌ Mailbox ACK failed: {:?}",
2318                                                                e
2319                                                            );
2320                                                        }
2321                                                    }
2322                                                    Err(e) => {
2323                                                        tracing::error!(
2324                                                            severity = 6,
2325                                                            error_category = "handler_error",
2326                                                            request_id = %envelope.request_id,
2327                                                            route_key = %envelope.route_key,
2328                                                            "❌ handle_incoming failed: {:?}", e
2329                                                        );
2330
2331                                                        // Send error envelope back to caller so it
2332                                                        // receives a structured error rather than
2333                                                        // waiting until its deadline fires.
2334                                                        if let Ok(caller) = caller_id_result {
2335                                                            let error_response = actr_protocol::ErrorResponse {
2336                                                                code: protocol_error_to_code(&e),
2337                                                                message: e.to_string(),
2338                                                            };
2339                                                            #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
2340                                                            let mut error_envelope = RpcEnvelope {
2341                                                                request_id: request_id.clone(),
2342                                                                route_key: envelope.route_key.clone(),
2343                                                                payload: None,
2344                                                                error: Some(error_response),
2345                                                                traceparent: envelope.traceparent.clone(),
2346                                                                tracestate: envelope.tracestate.clone(),
2347                                                                metadata: Vec::new(),
2348                                                                timeout_ms: 30000,
2349                                                            };
2350                                                            #[cfg(feature = "opentelemetry")]
2351                                                            inject_span_context_to_rpc(&span, &mut error_envelope);
2352
2353                                                            send_envelope_to_caller(
2354                                                                &ws_gate,
2355                                                                &webrtc_gate,
2356                                                                &caller,
2357                                                                error_envelope,
2358                                                                &request_id,
2359                                                            ).await;
2360                                                        }
2361
2362                                                        // ACK to avoid infinite retries
2363                                                        let _ = mailbox.ack(msg_record.id).await;
2364                                                    }
2365                                                }
2366                                            }
2367                                            Err(e) => {
2368                                                // Poison message - cannot decode RpcEnvelope
2369                                                tracing::error!(
2370                                                    severity = 9,
2371                                                    error_category = "protobuf_decode",
2372                                                    message_id = %msg_record.id,
2373                                                    "❌ Poison message: Failed to deserialize RpcEnvelope: {:?}",
2374                                                    e
2375                                                );
2376
2377                                                // Write to Dead Letter Queue
2378                                                use actr_runtime_mailbox::DlqRecord;
2379                                                use chrono::Utc;
2380                                                use uuid::Uuid;
2381
2382                                                let dlq_record = DlqRecord {
2383                                                    id: Uuid::new_v4(),
2384                                                    original_message_id: Some(msg_record.id.to_string()),
2385                                                    from: Some(msg_record.from.clone()),
2386                                                    to: node.actor_id.as_ref().map(|id| {
2387                                                        let mut buf = Vec::new();
2388                                                        id.encode(&mut buf).unwrap();
2389                                                        buf
2390                                                    }),
2391                                                    raw_bytes: msg_record.payload.clone(),
2392                                                    error_message: format!("Protobuf decode failed: {e}"),
2393                                                    error_category: "protobuf_decode".to_string(),
2394                                                    trace_id: format!("mailbox-{}", msg_record.id),
2395                                                    request_id: None,
2396                                                    created_at: Utc::now(),
2397                                                    redrive_attempts: 0,
2398                                                    last_redrive_at: None,
2399                                                    context: Some(format!(
2400                                                        r#"{{"source":"mailbox","priority":"{}"}}"#,
2401                                                        match msg_record.priority {
2402                                                            actr_runtime_mailbox::MessagePriority::High => "high",
2403                                                            actr_runtime_mailbox::MessagePriority::Normal => "normal",
2404                                                        }
2405                                                    )),
2406                                                };
2407
2408                                                if let Err(dlq_err) = node.dlq.enqueue(dlq_record).await {
2409                                                    tracing::error!(
2410                                                        severity = 10,
2411                                                        "❌ CRITICAL: Failed to write poison message to DLQ: {:?}",
2412                                                        dlq_err
2413                                                    );
2414                                                } else {
2415                                                    tracing::warn!(
2416                                                        severity = 9,
2417                                                        "☠️ Poison message moved to DLQ: message_id={}",
2418                                                        msg_record.id
2419                                                    );
2420                                                }
2421
2422                                                // ACK the poison message to remove from mailbox
2423                                                let _ = mailbox.ack(msg_record.id).await;
2424                                            }
2425                                        }
2426                                    }
2427                                }
2428                                Err(e) => {
2429                                    tracing::error!(
2430                                        severity = 9,
2431                                        error_category = "mailbox_error",
2432                                        "❌ Mailbox dequeue failed: {:?}", e
2433                                    );
2434                                    tokio::time::sleep(Duration::from_secs(1)).await;
2435                                }
2436                            }
2437                        }
2438                    }
2439                }
2440                tracing::info!("✅ Mailbox processing loop terminated gracefully");
2441            });
2442
2443            task_handles.push(mailbox_handle);
2444        }
2445        tracing::info!("✅ Mailbox processing loop started");
2446        tracing::info!("✅ ActrNode started successfully");
2447
2448        {
2449            let ready_ctx = bootstrap_ctx_builder
2450                .build_bootstrap(&actor_id, &credential_state.credential().await);
2451            let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_ready");
2452            let call_executor =
2453                lifecycle_host_abi(ready_ctx.clone(), node_ref.workload_dispatch.clone());
2454            let mut workload = node_ref.workload_dispatch.lock().await;
2455            if let Err(e) = crate::lifecycle::hooks::call_lifecycle_hook(
2456                "on_ready",
2457                workload.on_ready(ready_ctx, invocation, &call_executor),
2458            )
2459            .await
2460            {
2461                tracing::warn!(error = %e, "workload on_ready returned Err");
2462            }
2463        }
2464
2465        // Create ActrRefShared
2466        let shared = Arc::new(ActrRefShared {
2467            actor_id,
2468            bootstrap_ctx_builder,
2469            credential_state,
2470            shutdown_token,
2471            task_handles: Mutex::new(task_handles),
2472        });
2473
2474        // Create ActrRef
2475        tracing::info!("✅ ActrRef created (Shell → Guest communication handle)");
2476
2477        Ok(ActrRef { shared })
2478    }
2479}