Skip to main content

actr_hyper/lifecycle/
node.rs

1//! Node runtime inner — holds all running-state fields for an attached node.
2//!
3//! This module is the internal implementation backing the public
4//! `Node<Attached>` / `Node<Registered>` typestate chain defined in
5//! `crate::lib`. The struct itself is crate-private; consumers interact with
6//! it indirectly through `Node<S>` → `ActrRef` transitions.
7
8use crate::actr_ref::{ActrRef, ActrRefShared};
9use crate::ais_client::AisClient;
10use crate::context::{BootstrapContextBuilder, RuntimeContext};
11use crate::inbound::{DataStreamRegistry, MediaFrameRegistry};
12use crate::lifecycle::dedup::{DEDUP_TTL, DedupOutcome, DedupState, DedupWaiter};
13use crate::outbound::Gate;
14use crate::transport::HostTransport;
15use crate::wire::webrtc::SignalingClient;
16#[cfg(feature = "opentelemetry")]
17use crate::wire::webrtc::trace::{inject_span_context_to_rpc, set_parent_from_rpc_envelope};
18use actr_framework::Bytes;
19use actr_protocol::prost::Message as ProstMessage;
20use actr_protocol::{
21    AIdCredential, ActorResult, ActrError, ActrId, ConnectionNotReadyInfo, PayloadType,
22    RegisterAuthMode, RegisterRequest, RpcEnvelope, TurnCredential, register_response,
23};
24use actr_runtime::check_acl_permission;
25use actr_runtime_mailbox::{DeadLetterQueue, Mailbox};
26use std::sync::Arc;
27use std::time::Duration;
28use tokio::sync::{Mutex, RwLock};
29use tokio_util::sync::CancellationToken;
30#[cfg(feature = "opentelemetry")]
31use tracing::Instrument as _;
32
33/// Internal running-state of an attached node.
34///
35/// Holds every field required to run a workload after `Hyper::attach` has
36/// bound a package. Kept private to the crate: external callers use the
37/// public `Node<S>` wrappers in `crate::lib` and the `ActrRef` handle
38/// returned by `Node::start`.
39pub(crate) struct Inner {
40    /// Runtime configuration
41    pub(crate) config: actr_config::RuntimeConfig,
42
43    /// SQLite persistent mailbox
44    pub(crate) mailbox: Arc<dyn Mailbox>,
45
46    /// Dead Letter Queue for poison messages
47    pub(crate) dlq: Arc<dyn DeadLetterQueue>,
48
49    /// In-process gate for `Dest::Shell` / `Dest::Local` calls.
50    ///
51    /// Created in `build()` together with `shell_to_workload` so the inproc
52    /// lane is usable as soon as the node exists, even before registration.
53    pub(crate) inproc_gate: Gate,
54
55    /// Cross-process gate for `Dest::Actor(_)` calls.
56    ///
57    /// `None` until `start()` finishes WebRTC / PeerGate initialization. Any
58    /// outbound call issued before that point returns `Internal("PeerGate
59    /// not initialized yet")` — see `RuntimeContext::select_gate`.
60    pub(crate) outproc_gate: Option<Gate>,
61
62    /// DataStream callback registry shared between the inbound WebRTC / WS
63    /// gates (which dispatch into it) and `RuntimeContext`
64    /// (register_stream / send_data_stream).
65    pub(crate) data_stream_registry: Arc<DataStreamRegistry>,
66
67    /// MediaTrack callback registry shared between WebRTC media tracks and
68    /// `RuntimeContext` (register_media_track / send_media_sample).
69    pub(crate) media_frame_registry: Arc<MediaFrameRegistry>,
70
71    /// Signaling client
72    pub(crate) signaling_client: Arc<dyn SignalingClient>,
73
74    /// Actor ID (obtained after startup)
75    pub(crate) actor_id: Option<ActrId>,
76
77    /// Actor Credential (obtained after startup, used for subsequent authentication messages)
78    pub(crate) credential_state: Option<CredentialState>,
79
80    /// WebRTC coordinator (created after startup)
81    pub(crate) webrtc_coordinator: Option<Arc<crate::wire::webrtc::WebRtcCoordinator>>,
82
83    /// Peer transport manager (created after startup)
84    pub(crate) peer_transport: Option<Arc<crate::transport::PeerTransport>>,
85
86    /// WebRTC Gate (created after startup)
87    pub(crate) webrtc_gate: Option<Arc<crate::wire::webrtc::gate::WebRtcGate>>,
88
89    /// WebSocket Gate (direct-connect mode inbound, optional)
90    pub(crate) websocket_gate: Option<Arc<crate::wire::websocket::WebSocketGate>>,
91
92    /// Shell → Workload transport (REQUEST direction)
93    ///
94    /// Workload receives REQUEST from Shell (zero serialization, direct RpcEnvelope passing)
95    pub(crate) shell_to_workload: Option<Arc<HostTransport>>,
96
97    /// Workload → Shell transport (RESPONSE direction)
98    ///
99    /// Workload sends RESPONSE to Shell (separate pending_requests from Shell's)
100    pub(crate) workload_to_shell: Option<Arc<HostTransport>>,
101
102    /// Shutdown token for graceful shutdown
103    pub(crate) shutdown_token: CancellationToken,
104
105    /// Packaged manifest.lock.toml content loaded at startup for fingerprint lookups.
106    ///
107    /// Wrapped in `Arc` so per-request `RuntimeContext` clones only bump a refcount
108    /// instead of deep-cloning the dependency vector.
109    pub(crate) actr_lock: Option<Arc<actr_config::lock::LockFile>>,
110    /// Network event receiver (from NetworkEventHandle)
111    pub(crate) network_event_rx:
112        Option<tokio::sync::mpsc::Receiver<crate::lifecycle::network_event::NetworkEventRequest>>,
113
114    /// Network event debounce configuration
115    pub(crate) network_event_debounce_config:
116        Option<crate::lifecycle::network_event::DebounceConfig>,
117
118    /// Request deduplication state (15 s TTL response cache, prevents double-processing on retry)
119    pub(crate) dedup_state: Arc<Mutex<DedupState>>,
120
121    /// Verified package manifest for package-backed nodes.
122    #[allow(dead_code)]
123    pub(crate) package_manifest: Option<actr_pack::PackageManifest>,
124
125    /// Pre-issued registration credential injected by the Hyper layer during
126    /// the `Attached → Registered` state transition. `start()` uses it directly
127    /// instead of re-registering with the signaling server.
128    pub(crate) preregistered_credential: Option<actr_protocol::register_response::RegisterOk>,
129
130    /// Shared WebSocket direct-connect address map populated by discovery
131    ///
132    /// Shared with `DefaultWireBuilder` so discovered ws:// URLs can be reused
133    /// directly instead of relying on a static url_template
134    /// The map is keyed by `ActrId`.
135    pub(crate) discovered_ws_addresses:
136        Arc<tokio::sync::RwLock<std::collections::HashMap<ActrId, String>>>,
137
138    /// Runtime workload (WASM, dynclib, etc.)
139    ///
140    /// `handle_incoming` dispatches through this workload.
141    ///
142    /// The `Mutex` serializes dispatch into a single guest actor instance:
143    /// `WasmWorkload::handle` and `DynClibWorkload::handle` both take
144    /// `&mut self` because the underlying Wasmtime `Store` / native guest
145    /// ABI is single-threaded, so concurrent dispatch through the same
146    /// instance would be unsound. Lifecycle hooks also take this lock because
147    /// package-backed WASM / dynclib workloads expose them on the same guest
148    /// instance; transport and other observation hooks reach linked workloads
149    /// through `hook_observer` without holding this lock.
150    pub(crate) workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
151
152    /// Optional shell-side observer that receives linked-workload transport /
153    /// credential / mailbox hook invocations.
154    ///
155    /// `None` means "no observer installed"; the built-in tracing defaults
156    /// still fire from the event-source wiring sites. When `Some`, hook
157    /// invocations are dispatched through `lifecycle::hooks::spawn_hook`
158    /// so panics in observer code cannot unwind into the event source.
159    #[allow(dead_code)]
160    pub(crate) hook_observer: Option<crate::lifecycle::hooks::WorkloadHookObserverRef>,
161
162    /// Queue-length threshold at which the mailbox backpressure
163    /// watchdog fires the framework `on_mailbox_backpressure` hook.
164    ///
165    /// Resolved from [`HyperConfig`] at node construction time so the
166    /// runtime loop does not need to hold a reference back to `HyperConfig`.
167    pub(crate) mailbox_backpressure_threshold: usize,
168
169    /// Lead time before credential expiry at which the framework fires
170    /// the `on_credential_expiring` hook. Resolved from [`HyperConfig`]
171    /// at node construction time.
172    #[allow(dead_code)]
173    pub(crate) credential_expiry_warning: Duration,
174}
175
176/// Credential state for shared access between tasks
177#[derive(Clone)]
178pub struct CredentialState {
179    inner: Arc<RwLock<CredentialStateInner>>,
180}
181
182#[derive(Clone)]
183struct CredentialStateInner {
184    credential: AIdCredential,
185    expires_at: Option<prost_types::Timestamp>,
186    /// HMAC time-limited TURN credential, updated together with credential on registration/renewal
187    turn_credential: Option<TurnCredential>,
188}
189
190impl CredentialState {
191    /// Create a new CredentialState with TURN credential
192    pub fn new(
193        credential: AIdCredential,
194        expires_at: Option<prost_types::Timestamp>,
195        turn_credential: Option<TurnCredential>,
196    ) -> Self {
197        Self {
198            inner: Arc::new(RwLock::new(CredentialStateInner {
199                credential,
200                expires_at,
201                turn_credential,
202            })),
203        }
204    }
205
206    pub async fn credential(&self) -> AIdCredential {
207        self.inner.read().await.credential.clone()
208    }
209
210    pub async fn expires_at(&self) -> Option<prost_types::Timestamp> {
211        self.inner.read().await.expires_at
212    }
213
214    /// Get TURN credential (HMAC time-limited credential)
215    pub async fn turn_credential(&self) -> Option<TurnCredential> {
216        self.inner.read().await.turn_credential.clone()
217    }
218
219    /// Update credential and TURN credential
220    ///
221    /// Called on credential renewal; only overwrites the old TURN credential when the new one is not empty
222    pub(crate) async fn update(
223        &self,
224        credential: AIdCredential,
225        expires_at: Option<prost_types::Timestamp>,
226        turn_credential: Option<TurnCredential>,
227    ) {
228        let mut guard = self.inner.write().await;
229        guard.credential = credential;
230        guard.expires_at = expires_at;
231        if turn_credential.is_some() {
232            guard.turn_credential = turn_credential;
233        }
234    }
235}
236
237/// Host operation executor - routes guest outbound calls through RuntimeContext
238///
239/// Called by the workload dispatch path in `handle_incoming`.
240async fn host_operation_handler(
241    ctx: crate::context::RuntimeContext,
242    workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
243    pending: crate::workload::HostOperation,
244) -> crate::workload::HostOperationResult {
245    use crate::workload::{HostOperation, HostOperationResult, decode_dest};
246    use actr_framework::guest::dynclib_abi::code as abi_code;
247    use actr_framework::{Context as _, Dest};
248    use actr_protocol::{DataStream, PayloadType};
249
250    /// Map `ActrError` to ABI error code, preserving semantics for guest-side discrimination
251    fn actr_error_to_code(err: &ActrError) -> i32 {
252        match err {
253            ActrError::DecodeFailure(_) | ActrError::InvalidArgument(_) => abi_code::PROTOCOL_ERROR,
254            _ => abi_code::GENERIC_ERROR,
255        }
256    }
257
258    match pending {
259        HostOperation::CallRaw(req) => {
260            match ctx
261                .call_raw(
262                    &Dest::Actor(req.target),
263                    req.route_key,
264                    PayloadType::RpcReliable,
265                    bytes::Bytes::from(req.payload),
266                    30_000,
267                )
268                .await
269            {
270                Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
271                Err(e) => {
272                    tracing::error!("call_raw routing failed: {e:?}");
273                    HostOperationResult::Error(actr_error_to_code(&e))
274                }
275            }
276        }
277
278        HostOperation::Call(req) => {
279            let dest = match decode_dest(&req.dest) {
280                Some(d) => d,
281                None => {
282                    tracing::error!(route_key = req.route_key, "call: dest decode failed");
283                    return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
284                }
285            };
286            match ctx
287                .call_raw(
288                    &dest,
289                    req.route_key,
290                    PayloadType::RpcReliable,
291                    bytes::Bytes::from(req.payload),
292                    30_000,
293                )
294                .await
295            {
296                Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
297                Err(e) => {
298                    tracing::error!("call routing failed: {e:?}");
299                    HostOperationResult::Error(actr_error_to_code(&e))
300                }
301            }
302        }
303
304        HostOperation::Tell(req) => {
305            let dest = match decode_dest(&req.dest) {
306                Some(d) => d,
307                None => {
308                    tracing::error!(route_key = req.route_key, "tell: dest decode failed");
309                    return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
310                }
311            };
312            match ctx
313                .tell_raw(
314                    &dest,
315                    req.route_key,
316                    PayloadType::RpcReliable,
317                    bytes::Bytes::from(req.payload),
318                )
319                .await
320            {
321                Ok(()) => HostOperationResult::Done,
322                Err(e) => {
323                    tracing::error!("tell routing failed: {e:?}");
324                    HostOperationResult::Error(actr_error_to_code(&e))
325                }
326            }
327        }
328
329        HostOperation::Discover(req) => {
330            match ctx.discover_route_candidate(&req.target_type).await {
331                Ok(id) => HostOperationResult::Bytes(id.encode_to_vec()),
332                Err(e) => {
333                    tracing::error!("discover failed: {e:?}");
334                    HostOperationResult::Error(actr_error_to_code(&e))
335                }
336            }
337        }
338
339        HostOperation::RegisterStream(req) => {
340            let stream_id = req.stream_id;
341            let callback_ctx = ctx.clone();
342            let callback_workload_dispatch = workload_dispatch.clone();
343            match ctx
344                .register_stream(stream_id, move |chunk: DataStream, sender| {
345                    let ctx_for_executor = callback_ctx.clone();
346                    let workload_dispatch = callback_workload_dispatch.clone();
347                    Box::pin(async move {
348                        let invocation = crate::workload::InvocationContext {
349                            self_id: actr_framework::Context::self_id(&ctx_for_executor).clone(),
350                            caller_id: Some(sender.clone()),
351                            request_id: format!(
352                                "data-stream:{}:{}",
353                                chunk.stream_id, chunk.sequence
354                            ),
355                        };
356                        let call_executor: crate::workload::HostAbiFn =
357                            std::sync::Arc::new(move |pending| {
358                                let ctx = ctx_for_executor.clone();
359                                Box::pin(async move {
360                                    stream_callback_host_operation_handler(ctx, pending).await
361                                })
362                            });
363                        let mut guard = workload_dispatch.lock().await;
364                        guard
365                            .dispatch_data_stream(chunk, sender, invocation, &call_executor)
366                            .await
367                    })
368                })
369                .await
370            {
371                Ok(()) => HostOperationResult::Done,
372                Err(e) => {
373                    tracing::error!("register_stream failed: {e:?}");
374                    HostOperationResult::Error(actr_error_to_code(&e))
375                }
376            }
377        }
378
379        HostOperation::UnregisterStream(req) => match ctx.unregister_stream(&req.stream_id).await {
380            Ok(()) => HostOperationResult::Done,
381            Err(e) => {
382                tracing::error!("unregister_stream failed: {e:?}");
383                HostOperationResult::Error(actr_error_to_code(&e))
384            }
385        },
386
387        HostOperation::SendDataStream(req) => {
388            let dest = match decode_dest(&req.dest) {
389                Some(d) => d,
390                None => {
391                    tracing::error!("send_data_stream: dest decode failed");
392                    return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
393                }
394            };
395            let payload_type = match PayloadType::try_from(req.payload_type) {
396                Ok(PayloadType::StreamReliable | PayloadType::StreamLatencyFirst) => {
397                    PayloadType::try_from(req.payload_type).expect("checked payload type")
398                }
399                Ok(other) => {
400                    tracing::error!(?other, "send_data_stream: invalid stream payload type");
401                    return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
402                }
403                Err(_) => {
404                    tracing::error!(
405                        payload_type = req.payload_type,
406                        "send_data_stream: unknown payload type"
407                    );
408                    return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
409                }
410            };
411            match ctx.send_data_stream(&dest, req.chunk, payload_type).await {
412                Ok(()) => HostOperationResult::Done,
413                Err(e) => {
414                    tracing::error!("send_data_stream failed: {e:?}");
415                    HostOperationResult::Error(actr_error_to_code(&e))
416                }
417            }
418        }
419    }
420}
421
422fn lifecycle_invocation(
423    actor_id: &ActrId,
424    request_id: &'static str,
425) -> crate::workload::InvocationContext {
426    crate::workload::InvocationContext {
427        self_id: actor_id.clone(),
428        caller_id: None,
429        request_id: request_id.to_string(),
430    }
431}
432
433pub(crate) fn lifecycle_host_abi(
434    ctx: crate::context::RuntimeContext,
435    workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
436) -> crate::workload::HostAbiFn {
437    std::sync::Arc::new(move |pending| {
438        let ctx = ctx.clone();
439        let workload_dispatch = workload_dispatch.clone();
440        Box::pin(async move { host_operation_handler(ctx, workload_dispatch, pending).await })
441    })
442}
443
444async fn stream_callback_host_operation_handler(
445    ctx: crate::context::RuntimeContext,
446    pending: crate::workload::HostOperation,
447) -> crate::workload::HostOperationResult {
448    use crate::workload::{HostOperation, HostOperationResult, decode_dest};
449    use actr_framework::guest::dynclib_abi::code as abi_code;
450    use actr_framework::{Context as _, Dest};
451    use actr_protocol::PayloadType;
452
453    fn actr_error_to_code(err: &ActrError) -> i32 {
454        match err {
455            ActrError::DecodeFailure(_) | ActrError::InvalidArgument(_) => abi_code::PROTOCOL_ERROR,
456            _ => abi_code::GENERIC_ERROR,
457        }
458    }
459
460    match pending {
461        HostOperation::CallRaw(req) => {
462            match ctx
463                .call_raw(
464                    &Dest::Actor(req.target),
465                    req.route_key,
466                    PayloadType::RpcReliable,
467                    bytes::Bytes::from(req.payload),
468                    30_000,
469                )
470                .await
471            {
472                Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
473                Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
474            }
475        }
476        HostOperation::Call(req) => {
477            let dest = match decode_dest(&req.dest) {
478                Some(d) => d,
479                None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
480            };
481            match ctx
482                .call_raw(
483                    &dest,
484                    req.route_key,
485                    PayloadType::RpcReliable,
486                    bytes::Bytes::from(req.payload),
487                    30_000,
488                )
489                .await
490            {
491                Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
492                Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
493            }
494        }
495        HostOperation::Tell(req) => {
496            let dest = match decode_dest(&req.dest) {
497                Some(d) => d,
498                None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
499            };
500            match ctx
501                .tell_raw(
502                    &dest,
503                    req.route_key,
504                    PayloadType::RpcReliable,
505                    bytes::Bytes::from(req.payload),
506                )
507                .await
508            {
509                Ok(()) => HostOperationResult::Done,
510                Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
511            }
512        }
513        HostOperation::Discover(req) => {
514            match ctx.discover_route_candidate(&req.target_type).await {
515                Ok(id) => HostOperationResult::Bytes(id.encode_to_vec()),
516                Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
517            }
518        }
519        HostOperation::RegisterStream(_) => {
520            tracing::error!("register_stream from inside a stream callback is not supported");
521            HostOperationResult::Error(abi_code::UNSUPPORTED_OP)
522        }
523        HostOperation::UnregisterStream(req) => match ctx.unregister_stream(&req.stream_id).await {
524            Ok(()) => HostOperationResult::Done,
525            Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
526        },
527        HostOperation::SendDataStream(req) => {
528            let dest = match decode_dest(&req.dest) {
529                Some(d) => d,
530                None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
531            };
532            let payload_type = match PayloadType::try_from(req.payload_type) {
533                Ok(PayloadType::StreamReliable | PayloadType::StreamLatencyFirst) => {
534                    PayloadType::try_from(req.payload_type).expect("checked payload type")
535                }
536                Ok(_) | Err(_) => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
537            };
538            match ctx.send_data_stream(&dest, req.chunk, payload_type).await {
539                Ok(()) => HostOperationResult::Done,
540                Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
541            }
542        }
543    }
544}
545
546/// Map `ActrError` to a stable, unique numeric code for the wire
547/// `ErrorResponse`.  Each variant has a distinct code so
548/// `wire_code_to_actr_error` can reconstruct the exact variant on the
549/// receiving side.
550///
551/// Code allocation (100xx namespace to avoid collisions with HTTP):
552///
553/// | code  | variant            |
554/// |-------|--------------------|
555/// | 10001 | Unavailable        |
556/// | 10002 | TimedOut           |
557/// | 10003 | NotFound           |
558/// | 10004 | PermissionDenied   |
559/// | 10005 | InvalidArgument    |
560/// | 10006 | UnknownRoute       |
561/// | 10007 | DependencyNotFound |
562/// | 10008 | DecodeFailure      |
563/// | 10009 | NotImplemented     |
564/// | 10010 | Internal           |
565/// | 10011 | ConnectionNotReady |
566pub(crate) fn protocol_error_to_code(err: &ActrError) -> u32 {
567    match err {
568        ActrError::Unavailable(_) => 10001,
569        ActrError::ConnectionNotReady(_) => 10011,
570        ActrError::TimedOut => 10002,
571        ActrError::NotFound(_) => 10003,
572        ActrError::PermissionDenied(_) => 10004,
573        ActrError::InvalidArgument(_) => 10005,
574        ActrError::UnknownRoute(_) => 10006,
575        ActrError::DependencyNotFound { .. } => 10007,
576        ActrError::DecodeFailure(_) => 10008,
577        ActrError::NotImplemented(_) => 10009,
578        ActrError::Internal(_) => 10010,
579    }
580}
581
582/// Reconstruct an `ActrError` from a wire code + message pair.
583///
584/// This is the inverse of `protocol_error_to_code`.  Unknown codes fall
585/// back to `ActrError::Unavailable` to avoid silent data loss.
586pub(crate) fn wire_code_to_actr_error(code: u32, message: String) -> ActrError {
587    match code {
588        10001 => ActrError::Unavailable(message),
589        10002 => ActrError::TimedOut,
590        10003 => ActrError::NotFound(message),
591        10004 => ActrError::PermissionDenied(message),
592        10005 => ActrError::InvalidArgument(message),
593        10006 => ActrError::UnknownRoute(message),
594        10007 => ActrError::DependencyNotFound {
595            service_name: String::new(),
596            message,
597        },
598        10008 => ActrError::DecodeFailure(message),
599        10009 => ActrError::NotImplemented(message),
600        10010 => ActrError::Internal(message),
601        10011 => ActrError::ConnectionNotReady(ConnectionNotReadyInfo {
602            retry_after_ms: parse_connection_not_ready_retry_hint(&message),
603        }),
604        // Legacy HTTP-ish codes emitted before this scheme was introduced,
605        // or any unknown future code: treat as Unavailable.
606        _ => ActrError::Unavailable(format!("rpc error {code}: {message}")),
607    }
608}
609
610fn parse_connection_not_ready_retry_hint(message: &str) -> Option<u64> {
611    // ErrorResponse currently carries only a code and display string. Keep this
612    // parser narrow and best-effort until the wire error payload grows typed details.
613    let marker = "retry_after_ms=Some(";
614    let start = message.find(marker)? + marker.len();
615    let rest = &message[start..];
616    let end = rest.find(')')?;
617    rest[..end].parse().ok()
618}
619
620impl Inner {
621    #[allow(dead_code)]
622    pub(crate) fn package_manifest(&self) -> Option<&actr_pack::PackageManifest> {
623        self.package_manifest.as_ref()
624    }
625
626    /// Network event processing loop (background task)
627    ///
628    /// # Responsibilities
629    /// - Receive network events from Channel
630    /// - Delegate to NetworkEventProcessor for handling
631    /// - Record processing time and send results
632    async fn network_event_loop(
633        event_rx: tokio::sync::mpsc::Receiver<crate::lifecycle::network_event::NetworkEventRequest>,
634        event_processor: Arc<dyn crate::lifecycle::network_event::NetworkEventProcessor>,
635        shutdown_token: CancellationToken,
636    ) {
637        crate::lifecycle::network_event::run_network_event_reconciler(
638            event_rx,
639            event_processor,
640            shutdown_token,
641        )
642        .await;
643    }
644
645    fn duplicate_wait_timeout(timeout_ms: i64) -> Duration {
646        if timeout_ms > 0 {
647            Duration::from_millis(timeout_ms as u64)
648        } else {
649            DEDUP_TTL
650        }
651    }
652
653    async fn wait_for_inflight_duplicate(
654        mut waiter: DedupWaiter,
655        timeout: Duration,
656    ) -> ActorResult<Bytes> {
657        let wait_for_result = async {
658            loop {
659                if let Some(result) = waiter.borrow().clone() {
660                    return result;
661                }
662
663                if waiter.changed().await.is_err() {
664                    if let Some(result) = waiter.borrow().clone() {
665                        return result;
666                    }
667                    return Err(ActrError::Unavailable(
668                        "duplicate request result unavailable".to_string(),
669                    ));
670                }
671            }
672        };
673
674        match tokio::time::timeout(timeout, wait_for_result).await {
675            Ok(result) => result,
676            Err(_) => Err(ActrError::Unavailable(format!(
677                "duplicate request in-flight timed out after {}ms",
678                timeout.as_millis()
679            ))),
680        }
681    }
682
683    /// - Single-hop calls: effectively identical
684    /// - Multi-hop calls: trace_id spans all hops, request_id per hop
685    #[cfg_attr(
686        feature = "opentelemetry",
687        tracing::instrument(
688            skip_all,
689            name = "ActrNode.handle_incoming",
690            fields(
691                actr_id = %self.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default(),
692                route_key = %envelope.route_key,
693                request_id = %envelope.request_id,
694            )
695        )
696    )]
697    pub async fn handle_incoming(
698        &self,
699        envelope: RpcEnvelope,
700        caller_id: Option<&ActrId>,
701    ) -> ActorResult<Bytes> {
702        // Log received message
703        if let Some(caller) = caller_id {
704            tracing::debug!(
705                "📨 Handling incoming message: route_key={}, caller={}, request_id={}",
706                envelope.route_key,
707                caller,
708                envelope.request_id
709            );
710        } else {
711            tracing::debug!(
712                "📨 Handling incoming message: route_key={}, request_id={}",
713                envelope.route_key,
714                envelope.request_id
715            );
716        }
717
718        // 0. Get actor_id early for ACL check
719        let actor_id = self.actor_id.as_ref().ok_or_else(|| {
720            ActrError::Internal(
721                "Actor ID not set - node must be started before handling messages".to_string(),
722            )
723        })?;
724
725        // 0.1. ACL Permission Check (before processing message)
726        let acl_allowed = check_acl_permission(caller_id, actor_id, self.config.acl.as_ref())
727            .map_err(|err_msg| ActrError::Internal(format!("ACL check failed: {}", err_msg)))?;
728
729        if !acl_allowed {
730            tracing::warn!(
731                severity = 5,
732                error_category = "acl_denied",
733                request_id = %envelope.request_id,
734                route_key = %envelope.route_key,
735                caller = %caller_id
736                    .map(|c| c.to_string())
737                    .unwrap_or_else(|| "<none>".to_string()),
738                "🚫 ACL: Permission denied"
739            );
740
741            return Err(ActrError::PermissionDenied(format!(
742                "ACL denied: {} is not allowed to call {}",
743                caller_id
744                    .map(|c| c.to_string())
745                    .unwrap_or_else(|| "<unknown>".to_string()),
746                actor_id
747            )));
748        }
749
750        // 0.2. Deduplication: return cached response for retried request_ids
751        let outcome = {
752            self.dedup_state
753                .lock()
754                .await
755                .check_or_mark(&envelope.request_id)
756        };
757        match outcome {
758            DedupOutcome::Fresh => {} // proceed normally
759            DedupOutcome::InFlight(waiter) => {
760                tracing::debug!(
761                    request_id = %envelope.request_id,
762                    route_key = %envelope.route_key,
763                    "duplicate request in-flight; waiting for original result"
764                );
765                return Self::wait_for_inflight_duplicate(
766                    waiter,
767                    Self::duplicate_wait_timeout(envelope.timeout_ms),
768                )
769                .await;
770            }
771            DedupOutcome::Duplicate(cached) => {
772                tracing::debug!(
773                    request_id = %envelope.request_id,
774                    route_key = %envelope.route_key,
775                    "♻️ returning cached response for duplicate request_id"
776                );
777                return cached;
778            }
779        }
780
781        // 1. Create Context with caller_id from transport layer
782        let credential_state = self.credential_state.clone().ok_or_else(|| {
783            ActrError::Internal(
784                "Credential not set - node must be started before handling messages".to_string(),
785            )
786        })?;
787        let ctx = self.make_runtime_context(
788            actor_id,
789            caller_id, // caller_id from transport layer (MessageRecord.from)
790            &envelope.request_id,
791            &credential_state.credential().await,
792        );
793
794        // 2. Dispatch
795        let dispatch_ctx = crate::workload::InvocationContext {
796            self_id: actor_id.clone(),
797            caller_id: caller_id.cloned(),
798            request_id: envelope.request_id.clone(),
799        };
800        let ctx_for_executor = ctx.clone();
801        let workload_for_executor = self.workload_dispatch.clone();
802        let call_executor: crate::workload::HostAbiFn = std::sync::Arc::new(move |pending| {
803            let ctx = ctx_for_executor.clone();
804            let workload_dispatch = workload_for_executor.clone();
805            Box::pin(async move { host_operation_handler(ctx, workload_dispatch, pending).await })
806        });
807
808        let mut guard = self.workload_dispatch.lock().await;
809        let result = guard
810            .dispatch_envelope(envelope.clone(), ctx.clone(), dispatch_ctx, &call_executor)
811            .await;
812
813        match &result {
814            Ok(_) => tracing::debug!(
815                request_id = %envelope.request_id,
816                route_key = %envelope.route_key,
817                "✅ Message handled successfully"
818            ),
819            Err(e) => tracing::error!(
820                severity = 6,
821                error_category = "handler_error",
822                request_id = %envelope.request_id,
823                route_key = %envelope.route_key,
824                "❌ Message handling failed: {:?}", e
825            ),
826        }
827
828        // 3. Store completed result in dedup cache before returning
829        self.dedup_state
830            .lock()
831            .await
832            .complete(&envelope.request_id, result.clone());
833
834        result
835    }
836
837    /// Build a new `Inner` from config and runtime workload.
838    ///
839    /// This is the internal constructor behind the public node builders and
840    /// Hyper package attach helpers.
841    pub(crate) async fn build(
842        config: actr_config::RuntimeConfig,
843        workload: crate::workload::Workload,
844        package_manifest: Option<actr_pack::PackageManifest>,
845        packaged_lock: Option<actr_config::lock::LockFile>,
846        mailbox_backpressure_threshold: usize,
847        credential_expiry_warning: Duration,
848    ) -> ActorResult<Self> {
849        use crate::outbound::{Gate, HostGate};
850        use crate::wire::webrtc::{ReconnectConfig, SignalingConfig, WebSocketSignalingClient};
851
852        tracing::info!("🚀 Initializing ActrNode");
853
854        // Initialize Mailbox
855        let mailbox_path = config
856            .mailbox_path
857            .as_ref()
858            .map(|p| p.to_string_lossy().to_string())
859            .unwrap_or_else(|| ":memory:".to_string());
860
861        tracing::info!("📂 Mailbox database path: {}", mailbox_path);
862
863        let mailbox: Arc<dyn actr_runtime_mailbox::Mailbox> = Arc::new(
864            actr_runtime_mailbox::SqliteMailbox::new(&mailbox_path)
865                .await
866                .map_err(|e| {
867                    actr_protocol::ActrError::Unavailable(format!("Mailbox init failed: {e}"))
868                })?,
869        );
870
871        // Initialize Dead Letter Queue
872        let dlq_path = if mailbox_path == ":memory:" {
873            ":memory:".to_string()
874        } else {
875            format!("{mailbox_path}.dlq")
876        };
877
878        let dlq: Arc<dyn actr_runtime_mailbox::DeadLetterQueue> = Arc::new(
879            actr_runtime_mailbox::SqliteDeadLetterQueue::new_standalone(&dlq_path)
880                .await
881                .map_err(|e| {
882                    actr_protocol::ActrError::Unavailable(format!("DLQ init failed: {e}"))
883                })?,
884        );
885        tracing::info!("✅ Dead Letter Queue initialized");
886
887        // Initialize signaling client
888        let webrtc_role = if config.webrtc.advanced.prefer_answerer() {
889            Some("answer".to_string())
890        } else {
891            None
892        };
893
894        let signaling_config = SignalingConfig {
895            server_url: config.signaling_url.clone(),
896            connection_timeout: 30,
897            heartbeat_interval: 30,
898            reconnect_config: ReconnectConfig::default(),
899            auth_config: None,
900            webrtc_role,
901        };
902
903        let client = Arc::new(WebSocketSignalingClient::new(signaling_config));
904        client.start_reconnect_manager();
905        let signaling_client: Arc<dyn crate::wire::webrtc::SignalingClient> = client;
906
907        // Initialize inproc infrastructure (Shell ↔ Guest)
908        let shell_to_workload = Arc::new(HostTransport::new());
909        let workload_to_shell = Arc::new(HostTransport::new());
910        let inproc_gate = Gate::Host(Arc::new(HostGate::new(shell_to_workload.clone())));
911
912        let data_stream_registry = Arc::new(DataStreamRegistry::new());
913        let media_frame_registry = Arc::new(MediaFrameRegistry::new());
914
915        tracing::info!("✅ Inproc infrastructure initialized (bidirectional Shell ↔ Guest)");
916
917        let actr_lock = if let Some(lock) = packaged_lock {
918            tracing::info!(
919                "📋 Loaded packaged manifest.lock.toml with {} dependencies",
920                lock.dependencies.len()
921            );
922            Some(Arc::new(lock))
923        } else {
924            tracing::warn!(
925                "⚠️ manifest.lock.toml not found in package. Continuing without dependency fingerprints."
926            );
927            None
928        };
929
930        tracing::info!("✅ ActrNode initialized");
931
932        Ok(Self {
933            config,
934            mailbox,
935            dlq,
936            inproc_gate,
937            outproc_gate: None, // Populated in start() once WebRTC / PeerGate is ready.
938            data_stream_registry,
939            media_frame_registry,
940            signaling_client,
941            actor_id: None,
942            credential_state: None,
943            webrtc_coordinator: None,
944            peer_transport: None,
945            webrtc_gate: None,
946            websocket_gate: None,
947            shell_to_workload: Some(shell_to_workload),
948            workload_to_shell: Some(workload_to_shell),
949            shutdown_token: CancellationToken::new(),
950            actr_lock,
951            network_event_rx: None,
952            network_event_debounce_config: None,
953            dedup_state: Arc::new(Mutex::new(DedupState::new())),
954            package_manifest,
955            preregistered_credential: None,
956            discovered_ws_addresses: Arc::new(tokio::sync::RwLock::new(
957                std::collections::HashMap::new(),
958            )),
959            workload_dispatch: Arc::new(Mutex::new(workload)),
960            hook_observer: None,
961            mailbox_backpressure_threshold,
962            credential_expiry_warning,
963        })
964    }
965
966    /// Snapshot the current runtime handles into a `BootstrapContextBuilder`.
967    ///
968    /// The returned builder is cloned into long-lived hook closures and into
969    /// `ActrRefShared` so those paths can materialize bootstrap contexts
970    /// without retaining a reference back to `Inner`. The snapshot freezes
971    /// `outproc_gate` and `actr_lock` at call time — callers that want to
972    /// observe a later-initialized `outproc_gate` must rebuild.
973    pub(crate) fn bootstrap_ctx_builder(&self) -> BootstrapContextBuilder {
974        BootstrapContextBuilder::new(
975            self.inproc_gate.clone(),
976            self.outproc_gate.clone(),
977            self.data_stream_registry.clone(),
978            self.media_frame_registry.clone(),
979            self.signaling_client.clone(),
980            self.actr_lock.clone(),
981            self.discovered_ws_addresses.clone(),
982        )
983    }
984
985    /// Build a `RuntimeContext` for the per-request dispatch path.
986    ///
987    /// Unlike `BootstrapContextBuilder::build_bootstrap`, this carries the
988    /// envelope's caller identity and request id through into the context.
989    pub(crate) fn make_runtime_context(
990        &self,
991        self_id: &ActrId,
992        caller_id: Option<&ActrId>,
993        request_id: &str,
994        credential: &AIdCredential,
995    ) -> RuntimeContext {
996        RuntimeContext::new(
997            self_id.clone(),
998            caller_id.cloned(),
999            request_id.to_string(),
1000            self.inproc_gate.clone(),
1001            self.outproc_gate.clone(),
1002            self.data_stream_registry.clone(),
1003            self.media_frame_registry.clone(),
1004            self.signaling_client.clone(),
1005            credential.clone(),
1006            self.actr_lock.clone(),
1007            self.discovered_ws_addresses.clone(),
1008        )
1009    }
1010
1011    /// Create network event processing infrastructure (called on demand, before `start()`).
1012    ///
1013    /// # Parameters
1014    /// - `debounce_ms`: Debounce window in milliseconds. If 0, no debounce.
1015    ///
1016    /// # Panics
1017    /// Panics if called more than once.
1018    pub fn create_network_event_handle(
1019        &mut self,
1020        debounce_ms: u64,
1021    ) -> crate::lifecycle::NetworkEventHandle {
1022        if self.network_event_rx.is_some() {
1023            panic!("create_network_event_handle() can only be called once");
1024        }
1025
1026        let (event_tx, event_rx) = tokio::sync::mpsc::channel(100);
1027
1028        let debounce_config = if debounce_ms > 0 {
1029            Some(crate::lifecycle::network_event::DebounceConfig {
1030                window: std::time::Duration::from_millis(debounce_ms),
1031            })
1032        } else {
1033            None
1034        };
1035
1036        self.network_event_rx = Some(event_rx);
1037        self.network_event_debounce_config = debounce_config;
1038
1039        tracing::info!(
1040            debounce_ms,
1041            channel_capacity = 100_u64,
1042            "network_event.node.handle_created"
1043        );
1044
1045        crate::lifecycle::NetworkEventHandle::new(event_tx)
1046    }
1047
1048    /// Attach a credential already issued by AIS so that `start()` can skip
1049    /// the signaling registration step.
1050    ///
1051    /// Called by the Hyper layer between `Hyper::register()` and `Hyper::start()`.
1052    pub fn set_preregistered_credential(&mut self, register_ok: register_response::RegisterOk) {
1053        tracing::debug!("Pre-registered credential attached; start() will skip AIS registration");
1054        self.preregistered_credential = Some(register_ok);
1055    }
1056
1057    /// Start the system
1058    pub async fn start(mut self) -> ActorResult<ActrRef> {
1059        tracing::info!("🚀 Starting ActrNode");
1060        tracing::info!("Actr Rust version: {}", env!("CARGO_PKG_VERSION"));
1061
1062        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1063        // 1. Build RegisterRequest
1064        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1065        // Get ActrType from configuration
1066        let actr_type = self.config.actr_type().clone();
1067        tracing::info!("📋 Actor type: {}", actr_type);
1068
1069        // ServiceSpec is derived by the Hyper layer from the verified package
1070        // (see `service_spec::calculate_service_spec_from_package`). The raw
1071        // ActrNode::start() path has no package context and always sends None
1072        // on its own RegisterRequest; callers that need a spec must go
1073        // through `Hyper::register()`.
1074        let service_spec = None;
1075
1076        // If a WebSocket listen port is configured, build the advertised ws:// address
1077        // to register with the signaling server so clients can discover it.
1078        let ws_address = if let Some(port) = self.config.websocket_listen_port {
1079            let host = self
1080                .config
1081                .websocket_advertised_host
1082                .as_deref()
1083                .unwrap_or("127.0.0.1");
1084            Some(format!("ws://{}:{}", host, port))
1085        } else {
1086            None
1087        };
1088
1089        if let Some(ref addr) = ws_address {
1090            tracing::info!(
1091                "📡 Advertising WebSocket address to signaling server: {}",
1092                addr
1093            );
1094        }
1095
1096        let register_request = RegisterRequest {
1097            actr_type: actr_type.clone(),
1098            realm: self.config.realm,
1099            service_spec,
1100            acl: self.config.acl.clone(),
1101            service: None,
1102            ws_address,
1103            auth_mode: Some(RegisterAuthMode::Linked as i32),
1104            ..Default::default()
1105        };
1106
1107        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1108        // 1. Obtain registration info (Hyper pre-injected or AIS HTTP)
1109        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1110        let register_ok = if let Some(injected) = self.preregistered_credential.take() {
1111            tracing::info!(
1112                "Using Hyper pre-injected registration credential; skipping AIS registration"
1113            );
1114            injected
1115        } else {
1116            let ais_endpoint = &self.config.ais_endpoint;
1117            tracing::info!(
1118                ais_endpoint = %ais_endpoint,
1119                "Registering actor with AIS via HTTP"
1120            );
1121            let mut ais = AisClient::new(ais_endpoint);
1122            if let Some(ref secret) = self.config.realm_secret {
1123                ais = ais.with_realm_secret(secret);
1124            }
1125            let resp = ais
1126                .register_linked(register_request.clone())
1127                .await
1128                .map_err(|e| ActrError::Unavailable(format!("AIS registration failed: {e}")))?;
1129            match resp.result {
1130                Some(register_response::Result::Success(ok)) => {
1131                    tracing::info!("✅ AIS HTTP registration successful");
1132                    ok
1133                }
1134                Some(register_response::Result::Error(error)) => {
1135                    tracing::error!(
1136                        severity = 10,
1137                        error_category = "registration_error",
1138                        error_code = error.code,
1139                        "❌ AIS registration failed: code={}, message={}",
1140                        error.code,
1141                        error.message
1142                    );
1143                    return Err(ActrError::Unavailable(format!(
1144                        "AIS registration rejected: {} (code: {})",
1145                        error.message, error.code
1146                    )));
1147                }
1148                None => {
1149                    tracing::error!(
1150                        severity = 10,
1151                        error_category = "registration_error",
1152                        "❌ AIS registration response missing result"
1153                    );
1154                    return Err(ActrError::Unavailable(
1155                        "Invalid AIS registration response: missing result".to_string(),
1156                    ));
1157                }
1158            }
1159        };
1160
1161        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1162        // 3. Set credential on signaling client, then connect signaling WS
1163        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1164        // The signaling server requires credential params in the WS URL for
1165        // authentication. We must set actor_id + credential BEFORE connecting
1166        // so that build_url_with_identity() includes them in the query string.
1167        let pre_connect_credential_state = {
1168            let actor_id = register_ok.actr_id.clone();
1169            let credential_state = CredentialState::new(
1170                register_ok.credential.clone(),
1171                register_ok.credential_expires_at,
1172                Some(register_ok.turn_credential.clone()),
1173            );
1174            self.signaling_client.set_actor_id(actor_id).await;
1175            self.signaling_client
1176                .set_credential_state(credential_state.clone())
1177                .await;
1178            credential_state
1179        };
1180
1181        // Install the signaling-side hook callback so that
1182        // SignalingConnectStart / Connected / Disconnected events flow
1183        // through the framework tracing defaults and into a
1184        // user-installed observer. Done BEFORE connect() so the initial
1185        // attempt produces a SignalingConnectStart event.
1186        {
1187            let actor_id = register_ok.actr_id.clone();
1188            let credential_state = pre_connect_credential_state.clone();
1189            // Snapshot at this point — outproc_gate is still None here, so
1190            // signaling-event contexts will carry None for outproc_gate
1191            // (matching the pre-existing behavior prior to B13 refactor).
1192            let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1193            let ctx_builder: crate::lifecycle::hooks::HookContextBuilder = Arc::new(move || {
1194                let snapshot = ctx_builder_snapshot.clone();
1195                let actor_id = actor_id.clone();
1196                let credential_state = credential_state.clone();
1197                Box::pin(async move {
1198                    Some(snapshot.build_bootstrap(&actor_id, &credential_state.credential().await))
1199                })
1200            });
1201            let cb = crate::lifecycle::hooks::build_hook_callback(
1202                self.hook_observer.clone(),
1203                ctx_builder,
1204            );
1205            self.signaling_client.set_hook_callback(cb);
1206        }
1207
1208        tracing::info!("📡 Connecting to signaling server (with credential)");
1209        self.signaling_client
1210            .connect()
1211            .await
1212            .map_err(|e| ActrError::Unavailable(format!("Signaling connect failed: {e}")))?;
1213        tracing::info!("✅ Connected to signaling server");
1214
1215        // Collect background task handles so they can be managed by ActrRefShared later.
1216        let mut task_handles = Vec::new();
1217
1218        // Node-level hook callback, built inside the registration
1219        // setup block below and published back out into this wider
1220        // scope so the mailbox backpressure watchdog can subscribe.
1221        let node_hook_callback: Option<crate::wire::webrtc::HookCallback>;
1222
1223        {
1224            let actor_id = register_ok.actr_id;
1225            let credential = register_ok.credential;
1226
1227            tracing::info!("🆔 Assigned ActrId: {}", actor_id);
1228            tracing::info!("🔐 Received credential (key_id: {})", credential.key_id);
1229            tracing::info!(
1230                "💓 Signaling heartbeat interval: {} seconds",
1231                register_ok.signaling_heartbeat_interval_secs
1232            );
1233
1234            // TurnCredential is a required field; should always be present under normal registration.
1235            tracing::debug!("TurnCredential received, TURN authentication ready");
1236
1237            if let Some(expires_at) = &register_ok.credential_expires_at {
1238                tracing::debug!("⏰ Credential expires at: {}s", expires_at.seconds);
1239            }
1240
1241            // Store ActrId and credential state
1242            self.actor_id = Some(actor_id.clone());
1243            let credential_state = CredentialState::new(
1244                credential,
1245                register_ok.credential_expires_at,
1246                Some(register_ok.turn_credential.clone()),
1247            );
1248            self.credential_state = Some(credential_state.clone());
1249
1250            // Build the node-level lifecycle hook callback once: it is
1251            // reused for the initial `on_credential_renewed`, handed to
1252            // the heartbeat task for subsequent credential events, and
1253            // handed to the mailbox backpressure watchdog for
1254            // `on_mailbox_backpressure` on rising-edge crossings.
1255            //
1256            // The signaling layer already has its own callback installed
1257            // above — this second callback only carries credential and
1258            // mailbox-backpressure events, so no overlap with the
1259            // signaling-event plumbing.
1260            node_hook_callback =
1261                {
1262                    let actor_id_for_hook = actor_id.clone();
1263                    let credential_state_for_hook = credential_state.clone();
1264                    // Snapshot at this point — outproc_gate is still None
1265                    // here; credential / mailbox hook contexts inherit that
1266                    // and therefore cannot issue Dest::Actor(_) calls (same
1267                    // behavior as before B13 refactor).
1268                    let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1269                    let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1270                        Arc::new(move || {
1271                            let snapshot = ctx_builder_snapshot.clone();
1272                            let actor_id = actor_id_for_hook.clone();
1273                            let credential_state = credential_state_for_hook.clone();
1274                            Box::pin(async move {
1275                                Some(snapshot.build_bootstrap(
1276                                    &actor_id,
1277                                    &credential_state.credential().await,
1278                                ))
1279                            })
1280                        });
1281                    Some(crate::lifecycle::hooks::build_hook_callback(
1282                        self.hook_observer.clone(),
1283                        ctx_builder,
1284                    ))
1285                };
1286
1287            // Fire `on_credential_renewed` at initial registration: the
1288            // credential is considered "renewed" from "nothing" to the
1289            // value just issued by AIS. Subsequent renewals fire the
1290            // same hook from `lifecycle::heartbeat`.
1291            if let Some(expires_at) = &register_ok.credential_expires_at {
1292                let new_expiry = std::time::UNIX_EPOCH
1293                    + std::time::Duration::from_secs(expires_at.seconds.max(0) as u64);
1294                if let Some(cb) = node_hook_callback.as_ref() {
1295                    cb(crate::wire::webrtc::HookEvent::CredentialRenewed { new_expiry }).await;
1296                } else {
1297                    tracing::info!(new_expiry = ?new_expiry, "credential renewed");
1298                }
1299            }
1300
1301            // Note: actor_id and credential_state were already set on signaling_client
1302            // before connect (step 3 above), so reconnect URLs already carry correct auth.
1303
1304            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1305            // 1.3. Inproc transports were filled in during `build()`; nothing
1306            //      to stage here now that ContextFactory has been removed.
1307            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1308            tracing::info!("✅ Inproc infrastructure already ready (created in ActrNode::build())");
1309
1310            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1311            // 1.5. Create WebRTC infrastructure
1312            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1313            tracing::info!("🌐 Initializing WebRTC infrastructure");
1314
1315            let media_frame_registry = self.media_frame_registry.clone();
1316
1317            // Create WebRtcCoordinator
1318            let coordinator = Arc::new(crate::wire::webrtc::WebRtcCoordinator::new(
1319                actor_id.clone(),
1320                credential_state.clone(),
1321                self.signaling_client.clone(),
1322                self.config.webrtc.clone(),
1323                media_frame_registry,
1324            ));
1325
1326            // Install the WebRTC hook callback — fires
1327            // WebRtcConnectStart / Connected (with relayed info) /
1328            // Disconnected HookEvents on every peer state change.
1329            {
1330                let actor_id_for_hook = actor_id.clone();
1331                let credential_state_for_hook = credential_state.clone();
1332                // Snapshot before outproc_gate is wired up (just below). This
1333                // preserves the pre-refactor behavior where WebRTC-event
1334                // hook contexts carry outproc_gate = None.
1335                let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1336                let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1337                    Arc::new(move || {
1338                        let snapshot = ctx_builder_snapshot.clone();
1339                        let actor_id = actor_id_for_hook.clone();
1340                        let credential_state = credential_state_for_hook.clone();
1341                        Box::pin(async move {
1342                            Some(
1343                                snapshot.build_bootstrap(
1344                                    &actor_id,
1345                                    &credential_state.credential().await,
1346                                ),
1347                            )
1348                        })
1349                    });
1350                let cb = crate::lifecycle::hooks::build_hook_callback(
1351                    self.hook_observer.clone(),
1352                    ctx_builder,
1353                );
1354                coordinator.set_hook_callback(cb);
1355            }
1356
1357            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1358            // 1.6. Create PeerTransport + PeerGate (new architecture)
1359            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1360            tracing::info!("🏗️  Creating PeerTransport with WebRTC support");
1361
1362            // Pre-allocate the pending-requests map so it can be shared between
1363            // DefaultWireBuilder (for outbound WS response reader tasks) and
1364            // PeerGate (for request/response matching).
1365            let pending_requests: crate::outbound::PendingRequestsMap =
1366                Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new()));
1367
1368            // Create DefaultWireBuilder with WebRTC coordinator
1369            use crate::transport::{DefaultWireBuilder, DefaultWireBuilderConfig};
1370
1371            // WebSocket channel always enabled: target ws:// address is fully discovered at runtime
1372            // Direct-connect mode: encode local node ActrId as hex, sent as X-Actr-Node-Id
1373            let local_id_hex = hex::encode(actor_id.encode_to_vec());
1374            let wire_builder_config = DefaultWireBuilderConfig {
1375                local_id_hex,
1376                enable_webrtc: true,
1377                enable_websocket: true,
1378                // Share the discovered_ws_addresses map so that post-discovery calls
1379                // can use the signaling-provided ws:// URL for this actor node.
1380                discovered_ws_addresses: self.discovered_ws_addresses.clone(),
1381                // Pass credential_state so outbound WS handshake carries X-Actr-Credential,
1382                // enabling peer WebSocketGate to perform Ed25519 signature verification.
1383                credential_state: Some(credential_state.clone()),
1384                // Pass pending_requests so outbound WS connections spawn reader tasks
1385                // to deliver server responses back to `send_request_with_type` futures.
1386                pending_requests: Some(pending_requests.clone()),
1387            };
1388            let wire_builder = Arc::new(DefaultWireBuilder::new(
1389                Some(coordinator.clone()),
1390                wire_builder_config,
1391            ));
1392
1393            // Create PeerTransport
1394            use crate::transport::PeerTransport;
1395            let transport_manager = Arc::new(PeerTransport::new(actor_id.clone(), wire_builder));
1396            self.peer_transport = Some(transport_manager.clone());
1397
1398            // Create PeerGate with the pre-allocated pending_requests map and WebRTC coordinator.
1399            use crate::outbound::PeerGate;
1400            let outproc_gate = Arc::new(PeerGate::with_pending_requests(
1401                transport_manager,
1402                Some(coordinator.clone()),
1403                pending_requests.clone(),
1404            ));
1405            let outproc_gate_enum = Gate::Peer(outproc_gate.clone());
1406            tracing::info!("PeerTransport + PeerGate initialized");
1407
1408            let data_stream_registry = self.data_stream_registry.clone();
1409
1410            // Create WebRtcGate with shared pending_requests and DataStreamRegistry
1411            let gate = Arc::new(crate::wire::webrtc::gate::WebRtcGate::new(
1412                coordinator.clone(),
1413                pending_requests,
1414                data_stream_registry.clone(),
1415            ));
1416            // Set local_id
1417            gate.set_local_id(actor_id.clone()).await;
1418            tracing::info!(
1419                "✅ WebRtcGate created with shared pending_requests and DataStreamRegistry"
1420            );
1421
1422            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1423            // 1.7. Wire the outproc gate into Inner so subsequent
1424            //      `make_runtime_context` / `bootstrap_ctx_builder` calls
1425            //      observe it. All per-request contexts created by
1426            //      `handle_incoming` go through this field live.
1427            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1428            tracing::info!("🔧 Wiring outproc_gate into node");
1429            self.outproc_gate = Some(outproc_gate_enum);
1430            tracing::info!("✅ Node runtime gates fully initialized (inproc + outproc)");
1431
1432            // Save references
1433            self.webrtc_coordinator = Some(coordinator.clone());
1434            self.webrtc_gate = Some(gate.clone());
1435            tracing::info!("✅ WebRTC infrastructure initialized");
1436
1437            // Fire `on_start` once the runtime context can see the initialized
1438            // gates, before starting request-accepting/background loops. Its
1439            // Err/panic aborts Node::start.
1440            {
1441                let startup_ctx = self
1442                    .bootstrap_ctx_builder()
1443                    .build_bootstrap(&actor_id, &credential_state.credential().await);
1444                let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_start");
1445                let call_executor =
1446                    lifecycle_host_abi(startup_ctx.clone(), self.workload_dispatch.clone());
1447                let mut workload = self.workload_dispatch.lock().await;
1448                crate::lifecycle::hooks::call_lifecycle_hook(
1449                    "on_start",
1450                    workload.on_start(startup_ctx, invocation, &call_executor),
1451                )
1452                .await?;
1453            }
1454
1455            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1456            // 1.7.6. WebSocket Server (direct-connect mode, optional)
1457            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1458            if let Some(listen_port) = self.config.websocket_listen_port {
1459                tracing::info!(
1460                    "🔌 WebSocket direct-connect mode enabled, binding port {}",
1461                    listen_port
1462                );
1463                use crate::key_cache::AisKeyCache;
1464                use crate::wire::websocket::gate::WsAuthContext;
1465                use crate::wire::websocket::{WebSocketGate, WebSocketServer};
1466
1467                // Build AisKeyCache and seed it with the signing key from the registration response
1468                let ais_key_cache = AisKeyCache::new();
1469                if !register_ok.signing_pubkey.is_empty() {
1470                    match ais_key_cache
1471                        .seed(register_ok.signing_key_id, &register_ok.signing_pubkey)
1472                        .await
1473                    {
1474                        Ok(()) => tracing::info!(
1475                            key_id = register_ok.signing_key_id,
1476                            "🔑 AisKeyCache seeded from RegisterOk"
1477                        ),
1478                        Err(e) => tracing::warn!(
1479                            key_id = register_ok.signing_key_id,
1480                            error = ?e,
1481                            "AisKeyCache seed failed; WebSocket will reject all inbound connections"
1482                        ),
1483                    }
1484                } else {
1485                    tracing::warn!(
1486                        "RegisterOk missing signing_pubkey; WebSocket credential verification will degrade"
1487                    );
1488                }
1489
1490                let auth_ctx = WsAuthContext {
1491                    ais_key_cache,
1492                    actor_id: actor_id.clone(),
1493                    credential_state: credential_state.clone(),
1494                    signaling_client: self.signaling_client.clone(),
1495                };
1496
1497                match WebSocketServer::bind(listen_port).await {
1498                    Ok((ws_server, conn_rx)) => {
1499                        ws_server.start(self.shutdown_token.clone());
1500                        let ws_gate = Arc::new(WebSocketGate::new(
1501                            conn_rx,
1502                            outproc_gate.get_pending_requests(),
1503                            data_stream_registry.clone(),
1504                            Some(auth_ctx),
1505                        ));
1506
1507                        // Install the WebSocket peer-lifecycle hook.
1508                        {
1509                            let actor_id_for_hook = actor_id.clone();
1510                            let credential_state_for_hook = credential_state.clone();
1511                            // Snapshot taken after outproc_gate is live: ws
1512                            // peer-lifecycle hook contexts can issue
1513                            // Dest::Actor(_) calls.
1514                            let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1515                            let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1516                                Arc::new(move || {
1517                                    let snapshot = ctx_builder_snapshot.clone();
1518                                    let actor_id = actor_id_for_hook.clone();
1519                                    let credential_state = credential_state_for_hook.clone();
1520                                    Box::pin(async move {
1521                                        Some(snapshot.build_bootstrap(
1522                                            &actor_id,
1523                                            &credential_state.credential().await,
1524                                        ))
1525                                    })
1526                                });
1527                            let cb = crate::lifecycle::hooks::build_hook_callback(
1528                                self.hook_observer.clone(),
1529                                ctx_builder,
1530                            );
1531                            ws_gate.set_hook_callback(cb);
1532                        }
1533
1534                        self.websocket_gate = Some(ws_gate);
1535                        tracing::info!(
1536                            "✅ WebSocketServer + WebSocketGate initialized (credential auth enabled)"
1537                        );
1538                    }
1539                    Err(e) => {
1540                        tracing::error!(
1541                            "❌ Failed to bind WebSocket server on port {}: {:?}",
1542                            listen_port,
1543                            e
1544                        );
1545                    }
1546                }
1547            }
1548
1549            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1550            // 1.7.5. Create shared state for credential management
1551            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1552            // Shared credential state initialized above; reused across tasks
1553
1554            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1555            // 1.8. Spawn heartbeat task (periodic Ping to signaling server)
1556            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1557            {
1558                let shutdown = self.shutdown_token.clone();
1559                let client = self.signaling_client.clone();
1560                let actor_id_for_heartbeat = actor_id.clone();
1561                let credential_state_for_heartbeat = credential_state.clone();
1562                let mailbox_for_heartbeat = self.mailbox.clone();
1563                let register_request_for_heartbeat = register_request.clone();
1564                let webrtc_coordinator_for_heartbeat = self.webrtc_coordinator.clone();
1565                let webrtc_gate_for_heartbeat = self.webrtc_gate.clone();
1566
1567                // Use interval from registration response, default to 30s
1568                let heartbeat_interval_secs = register_ok.signaling_heartbeat_interval_secs;
1569                let heartbeat_interval = if heartbeat_interval_secs > 0 {
1570                    Duration::from_secs(heartbeat_interval_secs as u64)
1571                } else {
1572                    Duration::from_secs(30)
1573                };
1574                let ais_endpoint_for_heartbeat = self.config.ais_endpoint.clone();
1575                let realm_secret_for_heartbeat = self.config.realm_secret.clone();
1576                let heartbeat_handle = tokio::spawn(crate::lifecycle::heartbeat::heartbeat_task(
1577                    shutdown,
1578                    client,
1579                    actor_id_for_heartbeat,
1580                    credential_state_for_heartbeat,
1581                    mailbox_for_heartbeat,
1582                    heartbeat_interval,
1583                    register_request_for_heartbeat,
1584                    ais_endpoint_for_heartbeat,
1585                    realm_secret_for_heartbeat,
1586                    node_hook_callback.clone(),
1587                    webrtc_coordinator_for_heartbeat,
1588                    webrtc_gate_for_heartbeat,
1589                ));
1590                task_handles.push(heartbeat_handle);
1591            }
1592            tracing::info!(
1593                "✅ Heartbeat task started (interval: {}s)",
1594                register_ok.signaling_heartbeat_interval_secs
1595            );
1596
1597            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1598            // 1.8.5. Spawn network event processing loop
1599            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1600            if let Some(event_rx) = self.network_event_rx.take() {
1601                use crate::lifecycle::network_event::DefaultNetworkEventProcessor;
1602
1603                // Create DefaultNetworkEventProcessor
1604                // If debounce config exists, use new_with_debounce
1605                let event_processor =
1606                    if let Some(config) = self.network_event_debounce_config.clone() {
1607                        Arc::new(
1608                            DefaultNetworkEventProcessor::new_with_debounce_and_peer_transport(
1609                                self.signaling_client.clone(),
1610                                self.webrtc_coordinator.clone(),
1611                                config,
1612                                self.peer_transport.clone(),
1613                            ),
1614                        )
1615                    } else {
1616                        Arc::new(DefaultNetworkEventProcessor::new_with_peer_transport(
1617                            self.signaling_client.clone(),
1618                            self.webrtc_coordinator.clone(),
1619                            self.peer_transport.clone(),
1620                        ))
1621                    };
1622
1623                let shutdown = self.shutdown_token.clone();
1624                let network_event_handle = tokio::spawn(async move {
1625                    Self::network_event_loop(event_rx, event_processor, shutdown).await;
1626                });
1627                task_handles.push(network_event_handle);
1628                tracing::info!("network_event.node.loop_started");
1629            } else {
1630                tracing::debug!("network_event.node.loop_not_started_no_handle");
1631            }
1632
1633            {
1634                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1635                // 1.9. Spawn dedicated Unregister task (best-effort, with timeout)
1636                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1637                //
1638                // This task:
1639                // - Waits for shutdown_token to be cancelled (e.g., wait_for_ctrl_c_and_shutdown)
1640                // - Then sends UnregisterRequest via signaling client with a timeout
1641                //
1642                // NOTE: we push its JoinHandle into task_handles so it can be aborted
1643                // by ActrRefShared::Drop if needed.
1644                let shutdown = self.shutdown_token.clone();
1645                let client = self.signaling_client.clone();
1646                let actor_id_for_unreg = actor_id.clone();
1647                let credential_state_for_unreg = credential_state.clone();
1648                let webrtc_coordinator = self.webrtc_coordinator.clone();
1649
1650                let unregister_handle = tokio::spawn(async move {
1651                    // Wait for shutdown signal
1652                    shutdown.cancelled().await;
1653                    tracing::info!(
1654                        "📡 Shutdown signal received, sending UnregisterRequest for Actor {}",
1655                        actor_id_for_unreg
1656                    );
1657
1658                    // 1. Close all WebRTC peer connections first (if any)
1659                    if let Some(coord) = webrtc_coordinator {
1660                        if let Err(e) = coord.close_all_peers().await {
1661                            tracing::warn!(
1662                                "⚠️ Failed to close all WebRTC peers before UnregisterRequest: {}",
1663                                e
1664                            );
1665                        } else {
1666                            tracing::info!("✅ All WebRTC peers closed before UnregisterRequest");
1667                        }
1668                    } else {
1669                        tracing::debug!(
1670                            "WebRTC coordinator not found before UnregisterRequest (no WebRTC?)"
1671                        );
1672                    }
1673
1674                    // 2. Then send UnregisterRequest with a timeout (e.g. 5 seconds)
1675                    let result = tokio::time::timeout(
1676                        Duration::from_secs(5),
1677                        client.send_unregister_request(
1678                            actor_id_for_unreg.clone(),
1679                            credential_state_for_unreg.credential().await,
1680                            Some("Graceful shutdown".to_string()),
1681                        ),
1682                    )
1683                    .await;
1684                    tracing::info!("UnregisterRequest result: {:?}", result);
1685                    match result {
1686                        Ok(Ok(_)) => {
1687                            tracing::info!(
1688                                "✅ UnregisterRequest sent to signaling server for Actor {}",
1689                                actor_id_for_unreg
1690                            );
1691                        }
1692                        Ok(Err(e)) => {
1693                            tracing::warn!(
1694                                "⚠️ Failed to send UnregisterRequest for Actor {}: {}",
1695                                actor_id_for_unreg,
1696                                e
1697                            );
1698                        }
1699                        Err(_) => {
1700                            tracing::warn!(
1701                                "⚠️ UnregisterRequest timeout (5s) for Actor {}",
1702                                actor_id_for_unreg
1703                            );
1704                        }
1705                    }
1706                });
1707
1708                task_handles.push(unregister_handle);
1709            }
1710        } // end registration setup block
1711
1712        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1713        // 2. Transport layer initialization (completed via WebRTC infrastructure)
1714        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1715        tracing::info!("✅ Transport layer initialized via WebRTC infrastructure");
1716
1717        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1718        // 3.1 Convert to Arc (before starting background loops)
1719        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1720        // Clone actor_id before moving self into Arc
1721        let actor_id = self
1722            .actor_id
1723            .as_ref()
1724            .ok_or_else(|| ActrError::Internal("Actor ID not set".to_string()))?
1725            .clone();
1726        // Snapshot now that outproc_gate has been wired above; this builder
1727        // is shared between on_start / on_stop hooks and the ActrRefShared
1728        // handle returned to the caller.
1729        let bootstrap_ctx_builder = self.bootstrap_ctx_builder();
1730        let credential_state = self
1731            .credential_state
1732            .clone()
1733            .expect("CredentialState must be initialized in start()");
1734        let shutdown_token = self.shutdown_token.clone();
1735        let node_ref = Arc::new(self);
1736
1737        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1738        // 3.2. Register workload-level stop hook.
1739        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1740        {
1741            let node = node_ref.clone();
1742            let actor_id = actor_id.clone();
1743            let credential_state = credential_state.clone();
1744            let shutdown = shutdown_token.clone();
1745            let on_stop_handle = tokio::spawn(async move {
1746                shutdown.cancelled().await;
1747                let stop_ctx = node
1748                    .bootstrap_ctx_builder()
1749                    .build_bootstrap(&actor_id, &credential_state.credential().await);
1750                let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_stop");
1751                let call_executor =
1752                    lifecycle_host_abi(stop_ctx.clone(), node.workload_dispatch.clone());
1753                let mut workload = node.workload_dispatch.lock().await;
1754                if let Err(e) = crate::lifecycle::hooks::call_lifecycle_hook(
1755                    "on_stop",
1756                    workload.on_stop(stop_ctx, invocation, &call_executor),
1757                )
1758                .await
1759                {
1760                    tracing::warn!(error = %e, "workload on_stop returned Err");
1761                }
1762            });
1763            task_handles.push(on_stop_handle);
1764        }
1765
1766        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1767        // 3.5. Start WebRTC background loops
1768        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1769        tracing::info!("🚀 Starting WebRTC background loops");
1770
1771        // Start WebRtcCoordinator signaling loop
1772        if let Some(coordinator) = &node_ref.webrtc_coordinator {
1773            coordinator.clone().start().await.map_err(|e| {
1774                ActrError::Unavailable(format!("WebRtcCoordinator start failed: {e}"))
1775            })?;
1776            tracing::info!("✅ WebRtcCoordinator signaling loop started");
1777        }
1778
1779        // Start WebRtcGate message receive loop (route to Mailbox)
1780        if let Some(gate) = &node_ref.webrtc_gate {
1781            gate.start_receive_loop(node_ref.mailbox.clone())
1782                .await
1783                .map_err(|e| {
1784                    ActrError::Unavailable(format!("WebRtcGate receive loop start failed: {e}"))
1785                })?;
1786            tracing::info!("✅ WebRtcGate → Mailbox routing started");
1787        }
1788
1789        // Start WebSocketGate message receive loop (route to Mailbox, direct-connect mode)
1790        if let Some(ws_gate) = &node_ref.websocket_gate {
1791            ws_gate
1792                .start_receive_loop(node_ref.mailbox.clone())
1793                .await
1794                .map_err(|e| {
1795                    ActrError::Unavailable(format!("WebSocketGate receive loop start failed: {e}"))
1796                })?;
1797            tracing::info!("✅ WebSocketGate → Mailbox routing started");
1798        }
1799        tracing::info!("✅ WebRTC background loops started");
1800
1801        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1802        // 4.6. Start Inproc receive loop (Shell → Guest)
1803        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1804        if let Some(shell_to_workload) = &node_ref.shell_to_workload {
1805            tracing::info!("🔄 Starting Inproc receive loop (Shell → Guest)");
1806            // Start Guest receive loop (Shell → Guest REQUEST)
1807            if let Some(workload_to_shell) = &node_ref.workload_to_shell {
1808                let node = node_ref.clone();
1809                let request_rx_lane = shell_to_workload
1810                    .get_lane(PayloadType::RpcReliable, None)
1811                    .await
1812                    .map_err(|e| {
1813                        ActrError::Unavailable(format!("Failed to get guest receive lane: {e}"))
1814                    })?;
1815                let response_tx = workload_to_shell.clone();
1816                let shutdown = shutdown_token.clone();
1817
1818                let inproc_handle = tokio::spawn(async move {
1819                    loop {
1820                        tokio::select! {
1821                            _ = shutdown.cancelled() => {
1822                                tracing::info!("📭 Guest receive loop (Shell → Guest) received shutdown signal");
1823                                break;
1824                            }
1825                            envelope_result = request_rx_lane.recv_envelope() => {
1826                                match envelope_result {
1827                                    Ok(envelope) => {
1828                                        let request_id = envelope.request_id.clone();
1829                                        tracing::debug!("📨 Guest received REQUEST from Shell: request_id={}", request_id);
1830                                        // Extract and set tracing context from envelope
1831                                        #[cfg(feature = "opentelemetry")]
1832                                        let span = {
1833                                            let actr_id_str = node.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default();
1834                                            let span = tracing::info_span!("ActrNode.lane_receive", actr_id = %actr_id_str, request_id = %request_id);
1835                                            set_parent_from_rpc_envelope(&span, &envelope);
1836                                            span
1837                                        };
1838
1839                                        // Shell calls have no caller_id (local process communication)
1840                                        let handle_incoming_fut = node.handle_incoming(envelope.clone(), None);
1841                                        #[cfg(feature = "opentelemetry")]
1842                                        let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
1843
1844                                        match handle_incoming_fut.await {
1845                                            Ok(response_bytes) => {
1846                                                // Send RESPONSE back via workload_to_shell
1847                                                // Keep same route_key (no prefix needed - separate channels!)
1848                                                #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1849                                                let mut response_envelope = RpcEnvelope {
1850                                                    route_key: envelope.route_key.clone(),
1851                                                    payload: Some(response_bytes),
1852                                                    error: None,
1853                                                    traceparent: None,
1854                                                    tracestate: None,
1855                                                    request_id: request_id.clone(),
1856                                                    metadata: Vec::new(),
1857                                                    timeout_ms: 30000,
1858                                                };
1859                                                // Inject tracing context
1860                                                #[cfg(feature = "opentelemetry")]
1861                                                inject_span_context_to_rpc(&span, &mut response_envelope);
1862
1863                                                // Send via Guest → Shell channel
1864                                                let send_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, response_envelope);
1865                                                #[cfg(feature = "opentelemetry")]
1866                                                let send_response_fut = send_response_fut.instrument(span.clone());
1867                                                if let Err(e) = send_response_fut.await {
1868                                                    tracing::error!(
1869                                                        severity = 7,
1870                                                        error_category = "transport_error",
1871                                                        request_id = %request_id,
1872                                                        "❌ Failed to send RESPONSE to Shell: {:?}",
1873                                                        e
1874                                                    );
1875                                                }
1876                                            }
1877                                            Err(e) => {
1878                                                tracing::error!(
1879                                                    severity = 6,
1880                                                    error_category = "handler_error",
1881                                                    request_id = %request_id,
1882                                                    route_key = %envelope.route_key,
1883                                                    "❌ Guest message handling failed: {:?}",
1884                                                    e
1885                                                );
1886
1887                                                // Send error response (system-level error on envelope)
1888                                                let error_response = actr_protocol::ErrorResponse {
1889                                                    code: protocol_error_to_code(&e),
1890                                                    message: e.to_string(),
1891                                                };
1892                                                #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1893                                                let mut error_envelope = RpcEnvelope {
1894                                                    route_key: envelope.route_key.clone(),
1895                                                    payload: None,
1896                                                    error: Some(error_response),
1897                                                    traceparent: envelope.traceparent.clone(),
1898                                                    tracestate: envelope.tracestate.clone(),
1899                                                    request_id: request_id.clone(),
1900                                                    metadata: Vec::new(),
1901                                                    timeout_ms: 30000,
1902                                                };
1903                                                // Inject tracing context
1904                                                #[cfg(feature = "opentelemetry")]
1905                                                inject_span_context_to_rpc(&span, &mut error_envelope);
1906
1907                                                let send_error_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, error_envelope);
1908                                                #[cfg(feature = "opentelemetry")]
1909                                                let send_error_response_fut = send_error_response_fut.instrument(span);
1910                                                if let Err(send_err) = send_error_response_fut.await {
1911                                                    tracing::error!(
1912                                                        severity = 7,
1913                                                        error_category = "transport_error",
1914                                                        request_id = %request_id,
1915                                                        "❌ Failed to send ERROR response to Shell: {:?}",
1916                                                        send_err
1917                                                    );
1918                                                }
1919                                            }
1920                                        }
1921                                    }
1922                                    Err(e) => {
1923                                        tracing::error!(
1924                                            severity = 8,
1925                                            error_category = "transport_error",
1926                                            "❌ Failed to receive from Shell → Guest lane: {:?}",
1927                                            e
1928                                        );
1929                                        break;
1930                                    }
1931                                }
1932                            }
1933                        }
1934                    }
1935                    tracing::info!("✅ Guest receive loop (Shell → Guest) terminated gracefully");
1936                });
1937                task_handles.push(inproc_handle);
1938            }
1939        }
1940        tracing::info!("✅ Guest receive loop (Shell → Guest REQUEST) started");
1941
1942        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1943        // 4.7. Start Shell receive loop (Guest → Shell RESPONSE)
1944        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1945        tracing::info!("🔄 Starting Shell receive loop (Guest → Shell RESPONSE)");
1946        if let Some(workload_to_shell) = &node_ref.workload_to_shell {
1947            // Start Shell receive loop (Guest → Shell RESPONSE)
1948            if let Some(shell_to_workload) = &node_ref.shell_to_workload {
1949                let response_rx_lane = workload_to_shell
1950                    .get_lane(PayloadType::RpcReliable, None)
1951                    .await
1952                    .map_err(|e| {
1953                        ActrError::Unavailable(format!("Failed to get shell receive lane: {e}"))
1954                    })?;
1955                let request_mgr = shell_to_workload.clone();
1956                let shutdown = shutdown_token.clone();
1957
1958                let shell_receive_handle = tokio::spawn(async move {
1959                    loop {
1960                        tokio::select! {
1961                            _ = shutdown.cancelled() => {
1962                                tracing::info!("📭 Shell receive loop (Guest → Shell) received shutdown signal");
1963                                break;
1964                            }
1965                            envelope_result = response_rx_lane.recv_envelope() => {
1966                                match envelope_result {
1967                                    Ok(envelope) => {
1968                                        tracing::debug!(
1969                                            "📨 Shell received RESPONSE from Guest: request_id={}",
1970                                            envelope.request_id
1971                                        );
1972
1973                                        // Check if response is success or error
1974                                        match (envelope.payload, envelope.error) {
1975                                            (Some(payload), None) => {
1976                                                // Success response
1977                                                if let Err(e) = request_mgr
1978                                                    .complete_response(&envelope.request_id, payload)
1979                                                    .await
1980                                                {
1981                                                    tracing::warn!(
1982                                                        severity = 4,
1983                                                        error_category = "orphan_response",
1984                                                        request_id = %envelope.request_id,
1985                                                        "⚠️  No pending request found for response: {:?}",
1986                                                        e
1987                                                    );
1988                                                }
1989                                            }
1990                                            (None, Some(error)) => {
1991                                                // Error response — reconstruct the precise ActrError variant
1992                                                // from the wire code so binding-visible classification
1993                                                // (UnknownRoute / PermissionDenied / TimedOut / …) is preserved
1994                                                // instead of collapsing every error into Unavailable.
1995                                                let actr_err = wire_code_to_actr_error(error.code, error.message);
1996                                                if let Err(e) = request_mgr
1997                                                    .complete_error(&envelope.request_id, actr_err)
1998                                                    .await
1999                                                {
2000                                                    tracing::warn!(
2001                                                        severity = 4,
2002                                                        error_category = "orphan_response",
2003                                                        request_id = %envelope.request_id,
2004                                                        "⚠️  No pending request found for error response: {:?}",
2005                                                        e
2006                                                    );
2007                                                }
2008                                            }
2009                                            _ => {
2010                                                tracing::error!(
2011                                                    severity = 7,
2012                                                    error_category = "protocol_error",
2013                                                    request_id = %envelope.request_id,
2014                                                    "❌ Invalid RpcEnvelope: both payload and error are present or both absent"
2015                                                );
2016                                            }
2017                                        }
2018                                    }
2019                                    Err(e) => {
2020                                        tracing::error!(
2021                                            severity = 8,
2022                                            error_category = "transport_error",
2023                                            "❌ Failed to receive from Guest → Shell lane: {:?}",
2024                                            e
2025                                        );
2026                                        break;
2027                                    }
2028                                }
2029                            }
2030                        }
2031                    }
2032                    tracing::info!("✅ Shell receive loop (Guest → Shell) terminated gracefully");
2033                });
2034                task_handles.push(shell_receive_handle);
2035            }
2036        }
2037        tracing::info!("✅ Shell receive loop (Guest → Shell RESPONSE) started");
2038
2039        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2040        // 4.9. Mailbox backpressure watchdog
2041        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2042        //
2043        // Emits the framework `on_mailbox_backpressure` hook once per
2044        // rising-edge crossing of the configured threshold.
2045        //
2046        // Preferred path: a push-based notification from the mailbox
2047        // backend via [`Mailbox::set_depth_observer`], which runs
2048        // synchronously on every enqueue and has zero worst-case delay.
2049        //
2050        // Fallback path: mailbox backends without depth support (or
2051        // which can't cheaply compute depth on every enqueue) keep
2052        // using a 1 Hz poll of [`Mailbox::status`].
2053        let backpressure_threshold = node_ref.mailbox_backpressure_threshold;
2054        {
2055            use std::sync::atomic::{AtomicBool, Ordering};
2056            let mailbox = node_ref.mailbox.clone();
2057            let shutdown = shutdown_token.clone();
2058            let hook_cb = node_hook_callback.clone();
2059            let triggered = Arc::new(AtomicBool::new(false));
2060
2061            // Shared rising-edge state + hook-firing closure used by
2062            // both the push and polling code paths.
2063            let fire_if_rising = {
2064                let triggered = triggered.clone();
2065                let hook_cb = hook_cb.clone();
2066                Arc::new(move |queue_len: usize| {
2067                    if queue_len >= backpressure_threshold {
2068                        if !triggered.swap(true, Ordering::AcqRel) {
2069                            if let Some(cb) = hook_cb.as_ref() {
2070                                let cb = cb.clone();
2071                                tokio::spawn(async move {
2072                                    cb(crate::wire::webrtc::HookEvent::MailboxBackpressure {
2073                                        queue_len,
2074                                        threshold: backpressure_threshold,
2075                                    })
2076                                    .await;
2077                                });
2078                            } else {
2079                                tracing::warn!(
2080                                    queue_len,
2081                                    threshold = backpressure_threshold,
2082                                    "mailbox backpressure",
2083                                );
2084                            }
2085                        }
2086                    } else if triggered.swap(false, Ordering::AcqRel) {
2087                        tracing::info!(
2088                            queue_len,
2089                            threshold = backpressure_threshold,
2090                            "mailbox backpressure cleared",
2091                        );
2092                    }
2093                })
2094            };
2095
2096            // Try the push path first. The observer installs only if
2097            // the backend supports it; otherwise `installed` is `false`
2098            // and we fall through to polling.
2099            struct EnqueueObserver {
2100                fire: Arc<dyn Fn(usize) + Send + Sync + 'static>,
2101            }
2102            impl actr_runtime_mailbox::MailboxDepthObserver for EnqueueObserver {
2103                fn on_depth_change(&self, queued_messages: usize) {
2104                    (self.fire)(queued_messages);
2105                }
2106            }
2107
2108            let installed = {
2109                let observer: Arc<dyn actr_runtime_mailbox::MailboxDepthObserver> =
2110                    Arc::new(EnqueueObserver {
2111                        fire: fire_if_rising.clone(),
2112                    });
2113                mailbox.set_depth_observer(observer)
2114            };
2115
2116            if installed {
2117                tracing::debug!("mailbox backpressure watchdog: push notifications enabled");
2118            } else {
2119                tracing::debug!(
2120                    "mailbox backpressure watchdog: backend does not support push, falling back to 1 Hz polling"
2121                );
2122                let mailbox_for_poll = mailbox.clone();
2123                let shutdown_for_poll = shutdown.clone();
2124                let fire_for_poll = fire_if_rising.clone();
2125                let watchdog_handle = tokio::spawn(async move {
2126                    let mut ticker = tokio::time::interval(Duration::from_secs(1));
2127                    ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
2128                    loop {
2129                        tokio::select! {
2130                            _ = shutdown_for_poll.cancelled() => {
2131                                tracing::debug!(
2132                                    "mailbox backpressure watchdog shutting down"
2133                                );
2134                                break;
2135                            }
2136                            _ = ticker.tick() => {
2137                                let status = match mailbox_for_poll.status().await {
2138                                    Ok(s) => s,
2139                                    Err(e) => {
2140                                        tracing::debug!(?e, "mailbox status poll failed");
2141                                        continue;
2142                                    }
2143                                };
2144                                fire_for_poll(status.queued_messages as usize);
2145                            }
2146                        }
2147                    }
2148                });
2149                task_handles.push(watchdog_handle);
2150            }
2151        }
2152
2153        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2154        // 5. Start Mailbox processing loop (State Path)
2155        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2156        tracing::info!("🔄 Starting Mailbox processing loop (State Path)");
2157        {
2158            let node = node_ref.clone();
2159            let mailbox = node_ref.mailbox.clone();
2160            let webrtc_gate = node_ref.webrtc_gate.clone();
2161            let ws_gate = node_ref.websocket_gate.clone();
2162            let shutdown = shutdown_token.clone();
2163
2164            let mailbox_handle = tokio::spawn(async move {
2165                loop {
2166                    tokio::select! {
2167                        // Listen for shutdown signal
2168                        _ = shutdown.cancelled() => {
2169                            tracing::info!("📭 Mailbox loop received shutdown signal");
2170                            break;
2171                        }
2172                        // Dequeue messages (by priority)
2173                        result = mailbox.dequeue() => {
2174                            match result {
2175                                Ok(messages) => {
2176                                    if messages.is_empty() {
2177                                        // Queue empty, sleep briefly
2178                                        tokio::time::sleep(Duration::from_millis(10)).await;
2179                                        continue;
2180                                    }
2181                                    tracing::debug!("📬 Mailbox dequeue: {} messages", messages.len());
2182
2183                                    // Process messages one by one
2184                                    for msg_record in messages {
2185                                        // Deserialize RpcEnvelope (Protobuf)
2186                                        match RpcEnvelope::decode(&msg_record.payload[..]) {
2187                                            Ok(envelope) => {
2188                                                let request_id = envelope.request_id.clone();
2189                                                let queue_latency_ms = (chrono::Utc::now() - msg_record.created_at).num_milliseconds();
2190                                                tracing::info!(request_id = %request_id, queue_latency_ms = queue_latency_ms, "rpc.mailbox.dequeued");
2191
2192                                                tracing::debug!("📦 Processing message: request_id={}", request_id);
2193                                                #[cfg(feature = "opentelemetry")]
2194                                                let span = {
2195                                                    let actr_id_str = node.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default();
2196                                                    let span = tracing::info_span!("ActrNode.mailbox_receive", actr_id = %actr_id_str, request_id = %request_id, queue_wait_ms = queue_latency_ms);
2197                                                    set_parent_from_rpc_envelope(&span, &envelope);
2198                                                    span
2199                                                };
2200
2201                                                // Decode caller_id from MessageRecord.from (transport layer)
2202                                                let caller_id_result = ActrId::decode(&msg_record.from[..]);
2203                                                let caller_id_ref = caller_id_result.as_ref().ok();
2204
2205                                                if caller_id_ref.is_none() {
2206                                                    tracing::warn!(
2207                                                        request_id = %request_id,
2208                                                        "⚠️  Failed to decode caller_id from MessageRecord.from"
2209                                                    );
2210                                                }
2211
2212                                                // Call handle_incoming with caller_id from transport layer
2213                                                let handle_incoming_fut = node.handle_incoming(envelope.clone(), caller_id_ref);
2214                                                #[cfg(feature = "opentelemetry")]
2215                                                let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
2216
2217                                                /// Send `response_envelope` back to `caller` via the
2218                                                /// best available transport.
2219                                                ///
2220                                                /// Priority: inbound WebSocket connection (if caller
2221                                                /// dialled us directly) → WebRTC gate.  Returns the
2222                                                /// first transport error encountered, if any.
2223                                                async fn send_envelope_to_caller(
2224                                                    ws_gate: &Option<Arc<crate::wire::websocket::WebSocketGate>>,
2225                                                    webrtc_gate: &Option<Arc<crate::wire::webrtc::gate::WebRtcGate>>,
2226                                                    caller: &ActrId,
2227                                                    response_envelope: RpcEnvelope,
2228                                                    request_id: &str,
2229                                                ) {
2230                                                    // 1. Try inbound WebSocket connection first.
2231                                                    if let Some(wsg) = ws_gate {
2232                                                        match wsg.send_response(caller, response_envelope.clone()).await {
2233                                                            Ok(true) => return, // sent successfully
2234                                                            Ok(false) => {
2235                                                                tracing::debug!(
2236                                                                    request_id = request_id,
2237                                                                    caller = %caller,
2238                                                                    "No inbound WS connection for caller; falling back to WebRTC gate"
2239                                                                );
2240                                                            }
2241                                                            Err(e) => {
2242                                                                tracing::warn!(
2243                                                                    severity = 5,
2244                                                                    error_category = "transport_error",
2245                                                                    request_id = request_id,
2246                                                                    "WebSocketGate send_response failed, falling back: {:?}", e
2247                                                                );
2248                                                            }
2249                                                        }
2250                                                    }
2251
2252                                                    // 2. Fall back to WebRTC gate.
2253                                                    if let Some(gate) = webrtc_gate {
2254                                                        if let Err(e) = gate.send_response(caller, response_envelope).await {
2255                                                            tracing::error!(
2256                                                                severity = 7,
2257                                                                error_category = "transport_error",
2258                                                                request_id = request_id,
2259                                                                "❌ WebRtcGate send_response failed: {:?}", e
2260                                                            );
2261                                                        }
2262                                                    } else {
2263                                                        tracing::error!(
2264                                                            severity = 7,
2265                                                            error_category = "transport_error",
2266                                                            request_id = request_id,
2267                                                            "❌ No gate available to send response"
2268                                                        );
2269                                                    }
2270                                                }
2271
2272                                                match handle_incoming_fut.await {
2273                                                    Ok(response_bytes) => {
2274                                                        match caller_id_result {
2275                                                            Ok(caller) => {
2276                                                                // Construct response RpcEnvelope (reuse request_id!)
2277                                                                #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
2278                                                                let mut response_envelope = RpcEnvelope {
2279                                                                    request_id: request_id.clone(),
2280                                                                    route_key: envelope.route_key.clone(),
2281                                                                    payload: Some(response_bytes),
2282                                                                    error: None,
2283                                                                    traceparent: envelope.traceparent.clone(),
2284                                                                    tracestate: envelope.tracestate.clone(),
2285                                                                    metadata: Vec::new(),
2286                                                                    timeout_ms: 30000,
2287                                                                };
2288                                                                // Inject tracing context
2289                                                                #[cfg(feature = "opentelemetry")]
2290                                                                inject_span_context_to_rpc(&span, &mut response_envelope);
2291
2292                                                                #[cfg(feature = "opentelemetry")]
2293                                                                let send_fut = send_envelope_to_caller(
2294                                                                    &ws_gate,
2295                                                                    &webrtc_gate,
2296                                                                    &caller,
2297                                                                    response_envelope,
2298                                                                    &request_id,
2299                                                                ).instrument(span);
2300                                                                #[cfg(not(feature = "opentelemetry"))]
2301                                                                let send_fut = send_envelope_to_caller(
2302                                                                    &ws_gate,
2303                                                                    &webrtc_gate,
2304                                                                    &caller,
2305                                                                    response_envelope,
2306                                                                    &request_id,
2307                                                                );
2308                                                                send_fut.await;
2309                                                            }
2310                                                            Err(e) => {
2311                                                                tracing::error!(
2312                                                                    severity = 8,
2313                                                                    error_category = "protobuf_decode",
2314                                                                    request_id = %envelope.request_id,
2315                                                                    "❌ Failed to decode caller_id: {:?}",
2316                                                                    e
2317                                                                );
2318                                                            }
2319                                                        }
2320
2321                                                        // ACK message
2322                                                        if let Err(e) = mailbox.ack(msg_record.id).await {
2323                                                            tracing::error!(
2324                                                                severity = 9,
2325                                                                error_category = "mailbox_error",
2326                                                                request_id = %envelope.request_id,
2327                                                                message_id = %msg_record.id,
2328                                                                "❌ Mailbox ACK failed: {:?}",
2329                                                                e
2330                                                            );
2331                                                        }
2332                                                    }
2333                                                    Err(e) => {
2334                                                        tracing::error!(
2335                                                            severity = 6,
2336                                                            error_category = "handler_error",
2337                                                            request_id = %envelope.request_id,
2338                                                            route_key = %envelope.route_key,
2339                                                            "❌ handle_incoming failed: {:?}", e
2340                                                        );
2341
2342                                                        // Send error envelope back to caller so it
2343                                                        // receives a structured error rather than
2344                                                        // waiting until its deadline fires.
2345                                                        if let Ok(caller) = caller_id_result {
2346                                                            let error_response = actr_protocol::ErrorResponse {
2347                                                                code: protocol_error_to_code(&e),
2348                                                                message: e.to_string(),
2349                                                            };
2350                                                            #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
2351                                                            let mut error_envelope = RpcEnvelope {
2352                                                                request_id: request_id.clone(),
2353                                                                route_key: envelope.route_key.clone(),
2354                                                                payload: None,
2355                                                                error: Some(error_response),
2356                                                                traceparent: envelope.traceparent.clone(),
2357                                                                tracestate: envelope.tracestate.clone(),
2358                                                                metadata: Vec::new(),
2359                                                                timeout_ms: 30000,
2360                                                            };
2361                                                            #[cfg(feature = "opentelemetry")]
2362                                                            inject_span_context_to_rpc(&span, &mut error_envelope);
2363
2364                                                            send_envelope_to_caller(
2365                                                                &ws_gate,
2366                                                                &webrtc_gate,
2367                                                                &caller,
2368                                                                error_envelope,
2369                                                                &request_id,
2370                                                            ).await;
2371                                                        }
2372
2373                                                        // ACK to avoid infinite retries
2374                                                        let _ = mailbox.ack(msg_record.id).await;
2375                                                    }
2376                                                }
2377                                            }
2378                                            Err(e) => {
2379                                                // Poison message - cannot decode RpcEnvelope
2380                                                tracing::error!(
2381                                                    severity = 9,
2382                                                    error_category = "protobuf_decode",
2383                                                    message_id = %msg_record.id,
2384                                                    "❌ Poison message: Failed to deserialize RpcEnvelope: {:?}",
2385                                                    e
2386                                                );
2387
2388                                                // Write to Dead Letter Queue
2389                                                use actr_runtime_mailbox::DlqRecord;
2390                                                use chrono::Utc;
2391                                                use uuid::Uuid;
2392
2393                                                let dlq_record = DlqRecord {
2394                                                    id: Uuid::new_v4(),
2395                                                    original_message_id: Some(msg_record.id.to_string()),
2396                                                    from: Some(msg_record.from.clone()),
2397                                                    to: node.actor_id.as_ref().map(|id| {
2398                                                        let mut buf = Vec::new();
2399                                                        id.encode(&mut buf).unwrap();
2400                                                        buf
2401                                                    }),
2402                                                    raw_bytes: msg_record.payload.clone(),
2403                                                    error_message: format!("Protobuf decode failed: {e}"),
2404                                                    error_category: "protobuf_decode".to_string(),
2405                                                    trace_id: format!("mailbox-{}", msg_record.id),
2406                                                    request_id: None,
2407                                                    created_at: Utc::now(),
2408                                                    redrive_attempts: 0,
2409                                                    last_redrive_at: None,
2410                                                    context: Some(format!(
2411                                                        r#"{{"source":"mailbox","priority":"{}"}}"#,
2412                                                        match msg_record.priority {
2413                                                            actr_runtime_mailbox::MessagePriority::High => "high",
2414                                                            actr_runtime_mailbox::MessagePriority::Normal => "normal",
2415                                                        }
2416                                                    )),
2417                                                };
2418
2419                                                if let Err(dlq_err) = node.dlq.enqueue(dlq_record).await {
2420                                                    tracing::error!(
2421                                                        severity = 10,
2422                                                        "❌ CRITICAL: Failed to write poison message to DLQ: {:?}",
2423                                                        dlq_err
2424                                                    );
2425                                                } else {
2426                                                    tracing::warn!(
2427                                                        severity = 9,
2428                                                        "☠️ Poison message moved to DLQ: message_id={}",
2429                                                        msg_record.id
2430                                                    );
2431                                                }
2432
2433                                                // ACK the poison message to remove from mailbox
2434                                                let _ = mailbox.ack(msg_record.id).await;
2435                                            }
2436                                        }
2437                                    }
2438                                }
2439                                Err(e) => {
2440                                    tracing::error!(
2441                                        severity = 9,
2442                                        error_category = "mailbox_error",
2443                                        "❌ Mailbox dequeue failed: {:?}", e
2444                                    );
2445                                    tokio::time::sleep(Duration::from_secs(1)).await;
2446                                }
2447                            }
2448                        }
2449                    }
2450                }
2451                tracing::info!("✅ Mailbox processing loop terminated gracefully");
2452            });
2453
2454            task_handles.push(mailbox_handle);
2455        }
2456        tracing::info!("✅ Mailbox processing loop started");
2457        tracing::info!("✅ ActrNode started successfully");
2458
2459        {
2460            let ready_ctx = bootstrap_ctx_builder
2461                .build_bootstrap(&actor_id, &credential_state.credential().await);
2462            let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_ready");
2463            let call_executor =
2464                lifecycle_host_abi(ready_ctx.clone(), node_ref.workload_dispatch.clone());
2465            let mut workload = node_ref.workload_dispatch.lock().await;
2466            if let Err(e) = crate::lifecycle::hooks::call_lifecycle_hook(
2467                "on_ready",
2468                workload.on_ready(ready_ctx, invocation, &call_executor),
2469            )
2470            .await
2471            {
2472                tracing::warn!(error = %e, "workload on_ready returned Err");
2473            }
2474        }
2475
2476        // Create ActrRefShared
2477        let shared = Arc::new(ActrRefShared {
2478            actor_id,
2479            bootstrap_ctx_builder,
2480            credential_state,
2481            shutdown_token,
2482            task_handles: Mutex::new(task_handles),
2483        });
2484
2485        // Create ActrRef
2486        tracing::info!("✅ ActrRef created (Shell → Guest communication handle)");
2487
2488        Ok(ActrRef { shared })
2489    }
2490}
2491
2492#[cfg(test)]
2493mod tests {
2494    use super::*;
2495
2496    #[test]
2497    fn connection_not_ready_has_distinct_wire_code() {
2498        let err = ActrError::ConnectionNotReady(ConnectionNotReadyInfo::new(1200, 6000));
2499
2500        assert_eq!(protocol_error_to_code(&err), 10011);
2501    }
2502
2503    #[test]
2504    fn connection_not_ready_wire_code_roundtrips_variant_and_retry_hint() {
2505        let err = wire_code_to_actr_error(
2506            10011,
2507            "connection not ready: retry_after_ms=Some(4800)".to_string(),
2508        );
2509
2510        match err {
2511            ActrError::ConnectionNotReady(info) => {
2512                assert_eq!(info.retry_after_ms, Some(4800));
2513            }
2514            other => panic!("expected ConnectionNotReady, got {other:?}"),
2515        }
2516    }
2517
2518    #[test]
2519    fn connection_not_ready_wire_code_handles_missing_retry_hint() {
2520        let err = wire_code_to_actr_error(10011, "connection not ready".to_string());
2521
2522        match err {
2523            ActrError::ConnectionNotReady(info) => {
2524                assert_eq!(info.retry_after_ms, None);
2525            }
2526            other => panic!("expected ConnectionNotReady, got {other:?}"),
2527        }
2528    }
2529}