Skip to main content

actr_hyper/lifecycle/
node.rs

1//! Node runtime inner — holds all running-state fields for an attached node.
2//!
3//! This module is the internal implementation backing the public
4//! `Node<Attached>` / `Node<Registered>` typestate chain defined in
5//! `crate::lib`. The struct itself is crate-private; consumers interact with
6//! it indirectly through `Node<S>` → `ActrRef` transitions.
7
8use crate::actr_ref::{ActrRef, ActrRefShared};
9use crate::ais_client::AisClient;
10use crate::context::{BootstrapContextBuilder, RuntimeContext};
11use crate::inbound::{DataStreamRegistry, MediaFrameRegistry};
12use crate::lifecycle::dedup::{DEDUP_TTL, DedupOutcome, DedupState, DedupWaiter};
13use crate::outbound::Gate;
14use crate::transport::HostTransport;
15use crate::wire::webrtc::SignalingClient;
16#[cfg(feature = "opentelemetry")]
17use crate::wire::webrtc::trace::{inject_span_context_to_rpc, set_parent_from_rpc_envelope};
18use actr_framework::Bytes;
19use actr_protocol::prost::Message as ProstMessage;
20use actr_protocol::{
21    AIdCredential, ActorResult, ActrError, ActrId, PayloadType, RegisterAuthMode, RegisterRequest,
22    RpcEnvelope, TurnCredential, register_response,
23};
24use actr_runtime::check_acl_permission;
25use actr_runtime_mailbox::{DeadLetterQueue, Mailbox};
26use std::sync::Arc;
27use std::time::Duration;
28use tokio::sync::{Mutex, RwLock};
29use tokio_util::sync::CancellationToken;
30#[cfg(feature = "opentelemetry")]
31use tracing::Instrument as _;
32
33/// Internal running-state of an attached node.
34///
35/// Holds every field required to run a workload after `Hyper::attach` has
36/// bound a package. Kept private to the crate: external callers use the
37/// public `Node<S>` wrappers in `crate::lib` and the `ActrRef` handle
38/// returned by `Node::start`.
39pub(crate) struct Inner {
40    /// Runtime configuration
41    pub(crate) config: actr_config::RuntimeConfig,
42
43    /// SQLite persistent mailbox
44    pub(crate) mailbox: Arc<dyn Mailbox>,
45
46    /// Dead Letter Queue for poison messages
47    pub(crate) dlq: Arc<dyn DeadLetterQueue>,
48
49    /// In-process gate for `Dest::Shell` / `Dest::Local` calls.
50    ///
51    /// Created in `build()` together with `shell_to_workload` so the inproc
52    /// lane is usable as soon as the node exists, even before registration.
53    pub(crate) inproc_gate: Gate,
54
55    /// Cross-process gate for `Dest::Actor(_)` calls.
56    ///
57    /// `None` until `start()` finishes WebRTC / PeerGate initialization. Any
58    /// outbound call issued before that point returns `Internal("PeerGate
59    /// not initialized yet")` — see `RuntimeContext::select_gate`.
60    pub(crate) outproc_gate: Option<Gate>,
61
62    /// DataStream callback registry shared between the inbound WebRTC / WS
63    /// gates (which dispatch into it) and `RuntimeContext`
64    /// (register_stream / send_data_stream).
65    pub(crate) data_stream_registry: Arc<DataStreamRegistry>,
66
67    /// MediaTrack callback registry shared between WebRTC media tracks and
68    /// `RuntimeContext` (register_media_track / send_media_sample).
69    pub(crate) media_frame_registry: Arc<MediaFrameRegistry>,
70
71    /// Signaling client
72    pub(crate) signaling_client: Arc<dyn SignalingClient>,
73
74    /// Actor ID (obtained after startup)
75    pub(crate) actor_id: Option<ActrId>,
76
77    /// Actor Credential (obtained after startup, used for subsequent authentication messages)
78    pub(crate) credential_state: Option<CredentialState>,
79
80    /// WebRTC coordinator (created after startup)
81    pub(crate) webrtc_coordinator: Option<Arc<crate::wire::webrtc::WebRtcCoordinator>>,
82
83    /// Peer transport manager (created after startup)
84    pub(crate) peer_transport: Option<Arc<crate::transport::PeerTransport>>,
85
86    /// WebRTC Gate (created after startup)
87    pub(crate) webrtc_gate: Option<Arc<crate::wire::webrtc::gate::WebRtcGate>>,
88
89    /// WebSocket Gate (direct-connect mode inbound, optional)
90    pub(crate) websocket_gate: Option<Arc<crate::wire::websocket::WebSocketGate>>,
91
92    /// Shell → Workload transport (REQUEST direction)
93    ///
94    /// Workload receives REQUEST from Shell (zero serialization, direct RpcEnvelope passing)
95    pub(crate) shell_to_workload: Option<Arc<HostTransport>>,
96
97    /// Workload → Shell transport (RESPONSE direction)
98    ///
99    /// Workload sends RESPONSE to Shell (separate pending_requests from Shell's)
100    pub(crate) workload_to_shell: Option<Arc<HostTransport>>,
101
102    /// Shutdown token for graceful shutdown
103    pub(crate) shutdown_token: CancellationToken,
104
105    /// Packaged manifest.lock.toml content loaded at startup for fingerprint lookups.
106    ///
107    /// Wrapped in `Arc` so per-request `RuntimeContext` clones only bump a refcount
108    /// instead of deep-cloning the dependency vector.
109    pub(crate) actr_lock: Option<Arc<actr_config::lock::LockFile>>,
110    /// Network event receiver (from NetworkEventHandle)
111    pub(crate) network_event_rx:
112        Option<tokio::sync::mpsc::Receiver<crate::lifecycle::network_event::NetworkEventRequest>>,
113
114    /// Network event debounce configuration
115    pub(crate) network_event_debounce_config:
116        Option<crate::lifecycle::network_event::DebounceConfig>,
117
118    /// Request deduplication state (15 s TTL response cache, prevents double-processing on retry)
119    pub(crate) dedup_state: Arc<Mutex<DedupState>>,
120
121    /// Verified package manifest for package-backed nodes.
122    #[allow(dead_code)]
123    pub(crate) package_manifest: Option<actr_pack::PackageManifest>,
124
125    /// Pre-issued registration credential injected by the Hyper layer during
126    /// the `Attached → Registered` state transition. `start()` uses it directly
127    /// instead of re-registering with the signaling server.
128    pub(crate) preregistered_credential: Option<actr_protocol::register_response::RegisterOk>,
129
130    /// Shared WebSocket direct-connect address map populated by discovery
131    ///
132    /// Shared with `DefaultWireBuilder` so discovered ws:// URLs can be reused
133    /// directly instead of relying on a static url_template
134    /// The map is keyed by `ActrId`.
135    pub(crate) discovered_ws_addresses:
136        Arc<tokio::sync::RwLock<std::collections::HashMap<ActrId, String>>>,
137
138    /// Runtime workload (WASM, dynclib, etc.)
139    ///
140    /// `handle_incoming` dispatches through this workload.
141    ///
142    /// The `Mutex` serializes dispatch into a single guest actor instance:
143    /// `WasmWorkload::handle` and `DynClibWorkload::handle` both take
144    /// `&mut self` because the underlying Wasmtime `Store` / native guest
145    /// ABI is single-threaded, so concurrent dispatch through the same
146    /// instance would be unsound. Lifecycle hooks also take this lock because
147    /// package-backed WASM / dynclib workloads expose them on the same guest
148    /// instance; transport and other observation hooks reach linked workloads
149    /// through `hook_observer` without holding this lock.
150    pub(crate) workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
151
152    /// Optional shell-side observer that receives linked-workload transport /
153    /// credential / mailbox hook invocations.
154    ///
155    /// `None` means "no observer installed"; the built-in tracing defaults
156    /// still fire from the event-source wiring sites. When `Some`, hook
157    /// invocations are dispatched through `lifecycle::hooks::spawn_hook`
158    /// so panics in observer code cannot unwind into the event source.
159    #[allow(dead_code)]
160    pub(crate) hook_observer: Option<crate::lifecycle::hooks::WorkloadHookObserverRef>,
161
162    /// Queue-length threshold at which the mailbox backpressure
163    /// watchdog fires the framework `on_mailbox_backpressure` hook.
164    ///
165    /// Resolved from [`HyperConfig`] at node construction time so the
166    /// runtime loop does not need to hold a reference back to `HyperConfig`.
167    pub(crate) mailbox_backpressure_threshold: usize,
168
169    /// Lead time before credential expiry at which the framework fires
170    /// the `on_credential_expiring` hook. Resolved from [`HyperConfig`]
171    /// at node construction time.
172    #[allow(dead_code)]
173    pub(crate) credential_expiry_warning: Duration,
174}
175
176/// Credential state for shared access between tasks
177#[derive(Clone)]
178pub struct CredentialState {
179    inner: Arc<RwLock<CredentialStateInner>>,
180}
181
182#[derive(Clone)]
183struct CredentialStateInner {
184    credential: AIdCredential,
185    expires_at: Option<prost_types::Timestamp>,
186    /// HMAC time-limited TURN credential, updated together with credential on registration/renewal
187    turn_credential: Option<TurnCredential>,
188}
189
190impl CredentialState {
191    /// Create a new CredentialState with TURN credential
192    pub fn new(
193        credential: AIdCredential,
194        expires_at: Option<prost_types::Timestamp>,
195        turn_credential: Option<TurnCredential>,
196    ) -> Self {
197        Self {
198            inner: Arc::new(RwLock::new(CredentialStateInner {
199                credential,
200                expires_at,
201                turn_credential,
202            })),
203        }
204    }
205
206    pub async fn credential(&self) -> AIdCredential {
207        self.inner.read().await.credential.clone()
208    }
209
210    pub async fn expires_at(&self) -> Option<prost_types::Timestamp> {
211        self.inner.read().await.expires_at
212    }
213
214    /// Get TURN credential (HMAC time-limited credential)
215    pub async fn turn_credential(&self) -> Option<TurnCredential> {
216        self.inner.read().await.turn_credential.clone()
217    }
218
219    /// Update credential and TURN credential
220    ///
221    /// Called on credential renewal; only overwrites the old TURN credential when the new one is not empty
222    pub(crate) async fn update(
223        &self,
224        credential: AIdCredential,
225        expires_at: Option<prost_types::Timestamp>,
226        turn_credential: Option<TurnCredential>,
227    ) {
228        let mut guard = self.inner.write().await;
229        guard.credential = credential;
230        guard.expires_at = expires_at;
231        if turn_credential.is_some() {
232            guard.turn_credential = turn_credential;
233        }
234    }
235}
236
237/// Host operation executor - routes guest outbound calls through RuntimeContext
238///
239/// Called by the workload dispatch path in `handle_incoming`.
240async fn host_operation_handler(
241    ctx: crate::context::RuntimeContext,
242    workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
243    pending: crate::workload::HostOperation,
244) -> crate::workload::HostOperationResult {
245    use crate::workload::{HostOperation, HostOperationResult, decode_dest};
246    use actr_framework::guest::dynclib_abi::code as abi_code;
247    use actr_framework::{Context as _, Dest};
248    use actr_protocol::{DataStream, PayloadType};
249
250    /// Map `ActrError` to ABI error code, preserving semantics for guest-side discrimination
251    fn actr_error_to_code(err: &ActrError) -> i32 {
252        match err {
253            ActrError::DecodeFailure(_) | ActrError::InvalidArgument(_) => abi_code::PROTOCOL_ERROR,
254            _ => abi_code::GENERIC_ERROR,
255        }
256    }
257
258    match pending {
259        HostOperation::CallRaw(req) => {
260            match ctx
261                .call_raw(
262                    &Dest::Actor(req.target),
263                    req.route_key,
264                    PayloadType::RpcReliable,
265                    bytes::Bytes::from(req.payload),
266                    30_000,
267                )
268                .await
269            {
270                Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
271                Err(e) => {
272                    tracing::error!("call_raw routing failed: {e:?}");
273                    HostOperationResult::Error(actr_error_to_code(&e))
274                }
275            }
276        }
277
278        HostOperation::Call(req) => {
279            let dest = match decode_dest(&req.dest) {
280                Some(d) => d,
281                None => {
282                    tracing::error!(route_key = req.route_key, "call: dest decode failed");
283                    return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
284                }
285            };
286            match ctx
287                .call_raw(
288                    &dest,
289                    req.route_key,
290                    PayloadType::RpcReliable,
291                    bytes::Bytes::from(req.payload),
292                    30_000,
293                )
294                .await
295            {
296                Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
297                Err(e) => {
298                    tracing::error!("call routing failed: {e:?}");
299                    HostOperationResult::Error(actr_error_to_code(&e))
300                }
301            }
302        }
303
304        HostOperation::Tell(req) => {
305            let dest = match decode_dest(&req.dest) {
306                Some(d) => d,
307                None => {
308                    tracing::error!(route_key = req.route_key, "tell: dest decode failed");
309                    return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
310                }
311            };
312            match ctx
313                .tell_raw(
314                    &dest,
315                    req.route_key,
316                    PayloadType::RpcReliable,
317                    bytes::Bytes::from(req.payload),
318                )
319                .await
320            {
321                Ok(()) => HostOperationResult::Done,
322                Err(e) => {
323                    tracing::error!("tell routing failed: {e:?}");
324                    HostOperationResult::Error(actr_error_to_code(&e))
325                }
326            }
327        }
328
329        HostOperation::Discover(req) => {
330            match ctx.discover_route_candidate(&req.target_type).await {
331                Ok(id) => HostOperationResult::Bytes(id.encode_to_vec()),
332                Err(e) => {
333                    tracing::error!("discover failed: {e:?}");
334                    HostOperationResult::Error(actr_error_to_code(&e))
335                }
336            }
337        }
338
339        HostOperation::RegisterStream(req) => {
340            let stream_id = req.stream_id;
341            let callback_ctx = ctx.clone();
342            let callback_workload_dispatch = workload_dispatch.clone();
343            match ctx
344                .register_stream(stream_id, move |chunk: DataStream, sender| {
345                    let ctx_for_executor = callback_ctx.clone();
346                    let workload_dispatch = callback_workload_dispatch.clone();
347                    Box::pin(async move {
348                        let invocation = crate::workload::InvocationContext {
349                            self_id: actr_framework::Context::self_id(&ctx_for_executor).clone(),
350                            caller_id: Some(sender.clone()),
351                            request_id: format!(
352                                "data-stream:{}:{}",
353                                chunk.stream_id, chunk.sequence
354                            ),
355                        };
356                        let call_executor: crate::workload::HostAbiFn =
357                            std::sync::Arc::new(move |pending| {
358                                let ctx = ctx_for_executor.clone();
359                                Box::pin(async move {
360                                    stream_callback_host_operation_handler(ctx, pending).await
361                                })
362                            });
363                        let mut guard = workload_dispatch.lock().await;
364                        guard
365                            .dispatch_data_stream(chunk, sender, invocation, &call_executor)
366                            .await
367                    })
368                })
369                .await
370            {
371                Ok(()) => HostOperationResult::Done,
372                Err(e) => {
373                    tracing::error!("register_stream failed: {e:?}");
374                    HostOperationResult::Error(actr_error_to_code(&e))
375                }
376            }
377        }
378
379        HostOperation::UnregisterStream(req) => match ctx.unregister_stream(&req.stream_id).await {
380            Ok(()) => HostOperationResult::Done,
381            Err(e) => {
382                tracing::error!("unregister_stream failed: {e:?}");
383                HostOperationResult::Error(actr_error_to_code(&e))
384            }
385        },
386
387        HostOperation::SendDataStream(req) => {
388            let dest = match decode_dest(&req.dest) {
389                Some(d) => d,
390                None => {
391                    tracing::error!("send_data_stream: dest decode failed");
392                    return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
393                }
394            };
395            let payload_type = match PayloadType::try_from(req.payload_type) {
396                Ok(PayloadType::StreamReliable | PayloadType::StreamLatencyFirst) => {
397                    PayloadType::try_from(req.payload_type).expect("checked payload type")
398                }
399                Ok(other) => {
400                    tracing::error!(?other, "send_data_stream: invalid stream payload type");
401                    return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
402                }
403                Err(_) => {
404                    tracing::error!(
405                        payload_type = req.payload_type,
406                        "send_data_stream: unknown payload type"
407                    );
408                    return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
409                }
410            };
411            match ctx.send_data_stream(&dest, req.chunk, payload_type).await {
412                Ok(()) => HostOperationResult::Done,
413                Err(e) => {
414                    tracing::error!("send_data_stream failed: {e:?}");
415                    HostOperationResult::Error(actr_error_to_code(&e))
416                }
417            }
418        }
419    }
420}
421
422fn lifecycle_invocation(
423    actor_id: &ActrId,
424    request_id: &'static str,
425) -> crate::workload::InvocationContext {
426    crate::workload::InvocationContext {
427        self_id: actor_id.clone(),
428        caller_id: None,
429        request_id: request_id.to_string(),
430    }
431}
432
433pub(crate) fn lifecycle_host_abi(
434    ctx: crate::context::RuntimeContext,
435    workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
436) -> crate::workload::HostAbiFn {
437    std::sync::Arc::new(move |pending| {
438        let ctx = ctx.clone();
439        let workload_dispatch = workload_dispatch.clone();
440        Box::pin(async move { host_operation_handler(ctx, workload_dispatch, pending).await })
441    })
442}
443
444async fn stream_callback_host_operation_handler(
445    ctx: crate::context::RuntimeContext,
446    pending: crate::workload::HostOperation,
447) -> crate::workload::HostOperationResult {
448    use crate::workload::{HostOperation, HostOperationResult, decode_dest};
449    use actr_framework::guest::dynclib_abi::code as abi_code;
450    use actr_framework::{Context as _, Dest};
451    use actr_protocol::PayloadType;
452
453    fn actr_error_to_code(err: &ActrError) -> i32 {
454        match err {
455            ActrError::DecodeFailure(_) | ActrError::InvalidArgument(_) => abi_code::PROTOCOL_ERROR,
456            _ => abi_code::GENERIC_ERROR,
457        }
458    }
459
460    match pending {
461        HostOperation::CallRaw(req) => {
462            match ctx
463                .call_raw(
464                    &Dest::Actor(req.target),
465                    req.route_key,
466                    PayloadType::RpcReliable,
467                    bytes::Bytes::from(req.payload),
468                    30_000,
469                )
470                .await
471            {
472                Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
473                Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
474            }
475        }
476        HostOperation::Call(req) => {
477            let dest = match decode_dest(&req.dest) {
478                Some(d) => d,
479                None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
480            };
481            match ctx
482                .call_raw(
483                    &dest,
484                    req.route_key,
485                    PayloadType::RpcReliable,
486                    bytes::Bytes::from(req.payload),
487                    30_000,
488                )
489                .await
490            {
491                Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
492                Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
493            }
494        }
495        HostOperation::Tell(req) => {
496            let dest = match decode_dest(&req.dest) {
497                Some(d) => d,
498                None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
499            };
500            match ctx
501                .tell_raw(
502                    &dest,
503                    req.route_key,
504                    PayloadType::RpcReliable,
505                    bytes::Bytes::from(req.payload),
506                )
507                .await
508            {
509                Ok(()) => HostOperationResult::Done,
510                Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
511            }
512        }
513        HostOperation::Discover(req) => {
514            match ctx.discover_route_candidate(&req.target_type).await {
515                Ok(id) => HostOperationResult::Bytes(id.encode_to_vec()),
516                Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
517            }
518        }
519        HostOperation::RegisterStream(_) => {
520            tracing::error!("register_stream from inside a stream callback is not supported");
521            HostOperationResult::Error(abi_code::UNSUPPORTED_OP)
522        }
523        HostOperation::UnregisterStream(req) => match ctx.unregister_stream(&req.stream_id).await {
524            Ok(()) => HostOperationResult::Done,
525            Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
526        },
527        HostOperation::SendDataStream(req) => {
528            let dest = match decode_dest(&req.dest) {
529                Some(d) => d,
530                None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
531            };
532            let payload_type = match PayloadType::try_from(req.payload_type) {
533                Ok(PayloadType::StreamReliable | PayloadType::StreamLatencyFirst) => {
534                    PayloadType::try_from(req.payload_type).expect("checked payload type")
535                }
536                Ok(_) | Err(_) => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
537            };
538            match ctx.send_data_stream(&dest, req.chunk, payload_type).await {
539                Ok(()) => HostOperationResult::Done,
540                Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
541            }
542        }
543    }
544}
545
546/// Map ActrError to error code for ErrorResponse
547fn protocol_error_to_code(err: &ActrError) -> u32 {
548    match err {
549        ActrError::Unavailable(_) => 503,            // Service Unavailable
550        ActrError::TimedOut => 504,                  // Gateway Timeout
551        ActrError::NotFound(_) => 404,               // Not Found
552        ActrError::PermissionDenied(_) => 403,       // Forbidden
553        ActrError::InvalidArgument(_) => 400,        // Bad Request
554        ActrError::UnknownRoute(_) => 404,           // Not Found - route not found
555        ActrError::DependencyNotFound { .. } => 400, // Bad Request
556        ActrError::DecodeFailure(_) => 400,          // Bad Request - decode failure
557        ActrError::NotImplemented(_) => 501,         // Not Implemented
558        ActrError::Internal(_) => 500,               // Internal Server Error
559    }
560}
561
562impl Inner {
563    #[allow(dead_code)]
564    pub(crate) fn package_manifest(&self) -> Option<&actr_pack::PackageManifest> {
565        self.package_manifest.as_ref()
566    }
567
568    /// Network event processing loop (background task)
569    ///
570    /// # Responsibilities
571    /// - Receive network events from Channel
572    /// - Delegate to NetworkEventProcessor for handling
573    /// - Record processing time and send results
574    async fn network_event_loop(
575        event_rx: tokio::sync::mpsc::Receiver<crate::lifecycle::network_event::NetworkEventRequest>,
576        event_processor: Arc<dyn crate::lifecycle::network_event::NetworkEventProcessor>,
577        shutdown_token: CancellationToken,
578    ) {
579        crate::lifecycle::network_event::run_network_event_reconciler(
580            event_rx,
581            event_processor,
582            shutdown_token,
583        )
584        .await;
585    }
586
587    fn duplicate_wait_timeout(timeout_ms: i64) -> Duration {
588        if timeout_ms > 0 {
589            Duration::from_millis(timeout_ms as u64)
590        } else {
591            DEDUP_TTL
592        }
593    }
594
595    async fn wait_for_inflight_duplicate(
596        mut waiter: DedupWaiter,
597        timeout: Duration,
598    ) -> ActorResult<Bytes> {
599        let wait_for_result = async {
600            loop {
601                if let Some(result) = waiter.borrow().clone() {
602                    return result;
603                }
604
605                if waiter.changed().await.is_err() {
606                    if let Some(result) = waiter.borrow().clone() {
607                        return result;
608                    }
609                    return Err(ActrError::Unavailable(
610                        "duplicate request result unavailable".to_string(),
611                    ));
612                }
613            }
614        };
615
616        match tokio::time::timeout(timeout, wait_for_result).await {
617            Ok(result) => result,
618            Err(_) => Err(ActrError::Unavailable(format!(
619                "duplicate request in-flight timed out after {}ms",
620                timeout.as_millis()
621            ))),
622        }
623    }
624
625    /// - Single-hop calls: effectively identical
626    /// - Multi-hop calls: trace_id spans all hops, request_id per hop
627    #[cfg_attr(
628        feature = "opentelemetry",
629        tracing::instrument(
630            skip_all,
631            name = "ActrNode.handle_incoming",
632            fields(
633                actr_id = %self.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default(),
634                route_key = %envelope.route_key,
635                request_id = %envelope.request_id,
636            )
637        )
638    )]
639    pub async fn handle_incoming(
640        &self,
641        envelope: RpcEnvelope,
642        caller_id: Option<&ActrId>,
643    ) -> ActorResult<Bytes> {
644        // Log received message
645        if let Some(caller) = caller_id {
646            tracing::debug!(
647                "📨 Handling incoming message: route_key={}, caller={}, request_id={}",
648                envelope.route_key,
649                caller,
650                envelope.request_id
651            );
652        } else {
653            tracing::debug!(
654                "📨 Handling incoming message: route_key={}, request_id={}",
655                envelope.route_key,
656                envelope.request_id
657            );
658        }
659
660        // 0. Get actor_id early for ACL check
661        let actor_id = self.actor_id.as_ref().ok_or_else(|| {
662            ActrError::Internal(
663                "Actor ID not set - node must be started before handling messages".to_string(),
664            )
665        })?;
666
667        // 0.1. ACL Permission Check (before processing message)
668        let acl_allowed = check_acl_permission(caller_id, actor_id, self.config.acl.as_ref())
669            .map_err(|err_msg| ActrError::Internal(format!("ACL check failed: {}", err_msg)))?;
670
671        if !acl_allowed {
672            tracing::warn!(
673                severity = 5,
674                error_category = "acl_denied",
675                request_id = %envelope.request_id,
676                route_key = %envelope.route_key,
677                caller = %caller_id
678                    .map(|c| c.to_string())
679                    .unwrap_or_else(|| "<none>".to_string()),
680                "🚫 ACL: Permission denied"
681            );
682
683            return Err(ActrError::PermissionDenied(format!(
684                "ACL denied: {} is not allowed to call {}",
685                caller_id
686                    .map(|c| c.to_string())
687                    .unwrap_or_else(|| "<unknown>".to_string()),
688                actor_id
689            )));
690        }
691
692        // 0.2. Deduplication: return cached response for retried request_ids
693        let outcome = {
694            self.dedup_state
695                .lock()
696                .await
697                .check_or_mark(&envelope.request_id)
698        };
699        match outcome {
700            DedupOutcome::Fresh => {} // proceed normally
701            DedupOutcome::InFlight(waiter) => {
702                tracing::debug!(
703                    request_id = %envelope.request_id,
704                    route_key = %envelope.route_key,
705                    "duplicate request in-flight; waiting for original result"
706                );
707                return Self::wait_for_inflight_duplicate(
708                    waiter,
709                    Self::duplicate_wait_timeout(envelope.timeout_ms),
710                )
711                .await;
712            }
713            DedupOutcome::Duplicate(cached) => {
714                tracing::debug!(
715                    request_id = %envelope.request_id,
716                    route_key = %envelope.route_key,
717                    "♻️ returning cached response for duplicate request_id"
718                );
719                return cached;
720            }
721        }
722
723        // 1. Create Context with caller_id from transport layer
724        let credential_state = self.credential_state.clone().ok_or_else(|| {
725            ActrError::Internal(
726                "Credential not set - node must be started before handling messages".to_string(),
727            )
728        })?;
729        let ctx = self.make_runtime_context(
730            actor_id,
731            caller_id, // caller_id from transport layer (MessageRecord.from)
732            &envelope.request_id,
733            &credential_state.credential().await,
734        );
735
736        // 2. Dispatch
737        let dispatch_ctx = crate::workload::InvocationContext {
738            self_id: actor_id.clone(),
739            caller_id: caller_id.cloned(),
740            request_id: envelope.request_id.clone(),
741        };
742        let ctx_for_executor = ctx.clone();
743        let workload_for_executor = self.workload_dispatch.clone();
744        let call_executor: crate::workload::HostAbiFn = std::sync::Arc::new(move |pending| {
745            let ctx = ctx_for_executor.clone();
746            let workload_dispatch = workload_for_executor.clone();
747            Box::pin(async move { host_operation_handler(ctx, workload_dispatch, pending).await })
748        });
749
750        let mut guard = self.workload_dispatch.lock().await;
751        let result = guard
752            .dispatch_envelope(envelope.clone(), ctx.clone(), dispatch_ctx, &call_executor)
753            .await
754            .map_err(|e| ActrError::Internal(format!("workload dispatch failed: {e:?}")));
755
756        match &result {
757            Ok(_) => tracing::debug!(
758                request_id = %envelope.request_id,
759                route_key = %envelope.route_key,
760                "✅ Message handled successfully"
761            ),
762            Err(e) => tracing::error!(
763                severity = 6,
764                error_category = "handler_error",
765                request_id = %envelope.request_id,
766                route_key = %envelope.route_key,
767                "❌ Message handling failed: {:?}", e
768            ),
769        }
770
771        // 3. Store completed result in dedup cache before returning
772        self.dedup_state
773            .lock()
774            .await
775            .complete(&envelope.request_id, result.clone());
776
777        result
778    }
779
780    /// Build a new `Inner` from config and runtime workload.
781    ///
782    /// This is the internal constructor behind the public node builders and
783    /// Hyper package attach helpers.
784    pub(crate) async fn build(
785        config: actr_config::RuntimeConfig,
786        workload: crate::workload::Workload,
787        package_manifest: Option<actr_pack::PackageManifest>,
788        packaged_lock: Option<actr_config::lock::LockFile>,
789        mailbox_backpressure_threshold: usize,
790        credential_expiry_warning: Duration,
791    ) -> ActorResult<Self> {
792        use crate::outbound::{Gate, HostGate};
793        use crate::wire::webrtc::{ReconnectConfig, SignalingConfig, WebSocketSignalingClient};
794
795        tracing::info!("🚀 Initializing ActrNode");
796
797        // Initialize Mailbox
798        let mailbox_path = config
799            .mailbox_path
800            .as_ref()
801            .map(|p| p.to_string_lossy().to_string())
802            .unwrap_or_else(|| ":memory:".to_string());
803
804        tracing::info!("📂 Mailbox database path: {}", mailbox_path);
805
806        let mailbox: Arc<dyn actr_runtime_mailbox::Mailbox> = Arc::new(
807            actr_runtime_mailbox::SqliteMailbox::new(&mailbox_path)
808                .await
809                .map_err(|e| {
810                    actr_protocol::ActrError::Unavailable(format!("Mailbox init failed: {e}"))
811                })?,
812        );
813
814        // Initialize Dead Letter Queue
815        let dlq_path = if mailbox_path == ":memory:" {
816            ":memory:".to_string()
817        } else {
818            format!("{mailbox_path}.dlq")
819        };
820
821        let dlq: Arc<dyn actr_runtime_mailbox::DeadLetterQueue> = Arc::new(
822            actr_runtime_mailbox::SqliteDeadLetterQueue::new_standalone(&dlq_path)
823                .await
824                .map_err(|e| {
825                    actr_protocol::ActrError::Unavailable(format!("DLQ init failed: {e}"))
826                })?,
827        );
828        tracing::info!("✅ Dead Letter Queue initialized");
829
830        // Initialize signaling client
831        let webrtc_role = if config.webrtc.advanced.prefer_answerer() {
832            Some("answer".to_string())
833        } else {
834            None
835        };
836
837        let signaling_config = SignalingConfig {
838            server_url: config.signaling_url.clone(),
839            connection_timeout: 30,
840            heartbeat_interval: 30,
841            reconnect_config: ReconnectConfig::default(),
842            auth_config: None,
843            webrtc_role,
844        };
845
846        let client = Arc::new(WebSocketSignalingClient::new(signaling_config));
847        client.start_reconnect_manager();
848        let signaling_client: Arc<dyn crate::wire::webrtc::SignalingClient> = client;
849
850        // Initialize inproc infrastructure (Shell ↔ Guest)
851        let shell_to_workload = Arc::new(HostTransport::new());
852        let workload_to_shell = Arc::new(HostTransport::new());
853        let inproc_gate = Gate::Host(Arc::new(HostGate::new(shell_to_workload.clone())));
854
855        let data_stream_registry = Arc::new(DataStreamRegistry::new());
856        let media_frame_registry = Arc::new(MediaFrameRegistry::new());
857
858        tracing::info!("✅ Inproc infrastructure initialized (bidirectional Shell ↔ Guest)");
859
860        let actr_lock = if let Some(lock) = packaged_lock {
861            tracing::info!(
862                "📋 Loaded packaged manifest.lock.toml with {} dependencies",
863                lock.dependencies.len()
864            );
865            Some(Arc::new(lock))
866        } else {
867            tracing::warn!(
868                "⚠️ manifest.lock.toml not found in package. Continuing without dependency fingerprints."
869            );
870            None
871        };
872
873        tracing::info!("✅ ActrNode initialized");
874
875        Ok(Self {
876            config,
877            mailbox,
878            dlq,
879            inproc_gate,
880            outproc_gate: None, // Populated in start() once WebRTC / PeerGate is ready.
881            data_stream_registry,
882            media_frame_registry,
883            signaling_client,
884            actor_id: None,
885            credential_state: None,
886            webrtc_coordinator: None,
887            peer_transport: None,
888            webrtc_gate: None,
889            websocket_gate: None,
890            shell_to_workload: Some(shell_to_workload),
891            workload_to_shell: Some(workload_to_shell),
892            shutdown_token: CancellationToken::new(),
893            actr_lock,
894            network_event_rx: None,
895            network_event_debounce_config: None,
896            dedup_state: Arc::new(Mutex::new(DedupState::new())),
897            package_manifest,
898            preregistered_credential: None,
899            discovered_ws_addresses: Arc::new(tokio::sync::RwLock::new(
900                std::collections::HashMap::new(),
901            )),
902            workload_dispatch: Arc::new(Mutex::new(workload)),
903            hook_observer: None,
904            mailbox_backpressure_threshold,
905            credential_expiry_warning,
906        })
907    }
908
909    /// Snapshot the current runtime handles into a `BootstrapContextBuilder`.
910    ///
911    /// The returned builder is cloned into long-lived hook closures and into
912    /// `ActrRefShared` so those paths can materialize bootstrap contexts
913    /// without retaining a reference back to `Inner`. The snapshot freezes
914    /// `outproc_gate` and `actr_lock` at call time — callers that want to
915    /// observe a later-initialized `outproc_gate` must rebuild.
916    pub(crate) fn bootstrap_ctx_builder(&self) -> BootstrapContextBuilder {
917        BootstrapContextBuilder::new(
918            self.inproc_gate.clone(),
919            self.outproc_gate.clone(),
920            self.data_stream_registry.clone(),
921            self.media_frame_registry.clone(),
922            self.signaling_client.clone(),
923            self.actr_lock.clone(),
924        )
925    }
926
927    /// Build a `RuntimeContext` for the per-request dispatch path.
928    ///
929    /// Unlike `BootstrapContextBuilder::build_bootstrap`, this carries the
930    /// envelope's caller identity and request id through into the context.
931    pub(crate) fn make_runtime_context(
932        &self,
933        self_id: &ActrId,
934        caller_id: Option<&ActrId>,
935        request_id: &str,
936        credential: &AIdCredential,
937    ) -> RuntimeContext {
938        RuntimeContext::new(
939            self_id.clone(),
940            caller_id.cloned(),
941            request_id.to_string(),
942            self.inproc_gate.clone(),
943            self.outproc_gate.clone(),
944            self.data_stream_registry.clone(),
945            self.media_frame_registry.clone(),
946            self.signaling_client.clone(),
947            credential.clone(),
948            self.actr_lock.clone(),
949        )
950    }
951
952    /// Create network event processing infrastructure (called on demand, before `start()`).
953    ///
954    /// # Parameters
955    /// - `debounce_ms`: Debounce window in milliseconds. If 0, no debounce.
956    ///
957    /// # Panics
958    /// Panics if called more than once.
959    pub fn create_network_event_handle(
960        &mut self,
961        debounce_ms: u64,
962    ) -> crate::lifecycle::NetworkEventHandle {
963        if self.network_event_rx.is_some() {
964            panic!("create_network_event_handle() can only be called once");
965        }
966
967        let (event_tx, event_rx) = tokio::sync::mpsc::channel(100);
968
969        let debounce_config = if debounce_ms > 0 {
970            Some(crate::lifecycle::network_event::DebounceConfig {
971                window: std::time::Duration::from_millis(debounce_ms),
972            })
973        } else {
974            None
975        };
976
977        self.network_event_rx = Some(event_rx);
978        self.network_event_debounce_config = debounce_config;
979
980        tracing::info!(
981            debounce_ms,
982            channel_capacity = 100_u64,
983            "network_event.node.handle_created"
984        );
985
986        crate::lifecycle::NetworkEventHandle::new(event_tx)
987    }
988
989    /// Attach a credential already issued by AIS so that `start()` can skip
990    /// the signaling registration step.
991    ///
992    /// Called by the Hyper layer between `Hyper::register()` and `Hyper::start()`.
993    pub fn set_preregistered_credential(&mut self, register_ok: register_response::RegisterOk) {
994        tracing::debug!("Pre-registered credential attached; start() will skip AIS registration");
995        self.preregistered_credential = Some(register_ok);
996    }
997
998    /// Start the system
999    pub async fn start(mut self) -> ActorResult<ActrRef> {
1000        tracing::info!("🚀 Starting ActrNode");
1001        tracing::info!("Actr Rust version: {}", env!("CARGO_PKG_VERSION"));
1002
1003        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1004        // 1. Build RegisterRequest
1005        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1006        // Get ActrType from configuration
1007        let actr_type = self.config.actr_type().clone();
1008        tracing::info!("📋 Actor type: {}", actr_type);
1009
1010        // ServiceSpec is derived by the Hyper layer from the verified package
1011        // (see `service_spec::calculate_service_spec_from_package`). The raw
1012        // ActrNode::start() path has no package context and always sends None
1013        // on its own RegisterRequest; callers that need a spec must go
1014        // through `Hyper::register()`.
1015        let service_spec = None;
1016
1017        // If a WebSocket listen port is configured, build the advertised ws:// address
1018        // to register with the signaling server so clients can discover it.
1019        let ws_address = if let Some(port) = self.config.websocket_listen_port {
1020            let host = self
1021                .config
1022                .websocket_advertised_host
1023                .as_deref()
1024                .unwrap_or("127.0.0.1");
1025            Some(format!("ws://{}:{}", host, port))
1026        } else {
1027            None
1028        };
1029
1030        if let Some(ref addr) = ws_address {
1031            tracing::info!(
1032                "📡 Advertising WebSocket address to signaling server: {}",
1033                addr
1034            );
1035        }
1036
1037        let register_request = RegisterRequest {
1038            actr_type: actr_type.clone(),
1039            realm: self.config.realm,
1040            service_spec,
1041            acl: self.config.acl.clone(),
1042            service: None,
1043            ws_address,
1044            auth_mode: Some(RegisterAuthMode::Linked as i32),
1045            ..Default::default()
1046        };
1047
1048        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1049        // 1. Obtain registration info (Hyper pre-injected or AIS HTTP)
1050        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1051        let register_ok = if let Some(injected) = self.preregistered_credential.take() {
1052            tracing::info!(
1053                "Using Hyper pre-injected registration credential; skipping AIS registration"
1054            );
1055            injected
1056        } else {
1057            let ais_endpoint = &self.config.ais_endpoint;
1058            tracing::info!(
1059                ais_endpoint = %ais_endpoint,
1060                "Registering actor with AIS via HTTP"
1061            );
1062            let mut ais = AisClient::new(ais_endpoint);
1063            if let Some(ref secret) = self.config.realm_secret {
1064                ais = ais.with_realm_secret(secret);
1065            }
1066            let resp = ais
1067                .register_linked(register_request.clone())
1068                .await
1069                .map_err(|e| ActrError::Unavailable(format!("AIS registration failed: {e}")))?;
1070            match resp.result {
1071                Some(register_response::Result::Success(ok)) => {
1072                    tracing::info!("✅ AIS HTTP registration successful");
1073                    ok
1074                }
1075                Some(register_response::Result::Error(error)) => {
1076                    tracing::error!(
1077                        severity = 10,
1078                        error_category = "registration_error",
1079                        error_code = error.code,
1080                        "❌ AIS registration failed: code={}, message={}",
1081                        error.code,
1082                        error.message
1083                    );
1084                    return Err(ActrError::Unavailable(format!(
1085                        "AIS registration rejected: {} (code: {})",
1086                        error.message, error.code
1087                    )));
1088                }
1089                None => {
1090                    tracing::error!(
1091                        severity = 10,
1092                        error_category = "registration_error",
1093                        "❌ AIS registration response missing result"
1094                    );
1095                    return Err(ActrError::Unavailable(
1096                        "Invalid AIS registration response: missing result".to_string(),
1097                    ));
1098                }
1099            }
1100        };
1101
1102        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1103        // 3. Set credential on signaling client, then connect signaling WS
1104        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1105        // The signaling server requires credential params in the WS URL for
1106        // authentication. We must set actor_id + credential BEFORE connecting
1107        // so that build_url_with_identity() includes them in the query string.
1108        let pre_connect_credential_state = {
1109            let actor_id = register_ok.actr_id.clone();
1110            let credential_state = CredentialState::new(
1111                register_ok.credential.clone(),
1112                register_ok.credential_expires_at,
1113                Some(register_ok.turn_credential.clone()),
1114            );
1115            self.signaling_client.set_actor_id(actor_id).await;
1116            self.signaling_client
1117                .set_credential_state(credential_state.clone())
1118                .await;
1119            credential_state
1120        };
1121
1122        // Install the signaling-side hook callback so that
1123        // SignalingConnectStart / Connected / Disconnected events flow
1124        // through the framework tracing defaults and into a
1125        // user-installed observer. Done BEFORE connect() so the initial
1126        // attempt produces a SignalingConnectStart event.
1127        {
1128            let actor_id = register_ok.actr_id.clone();
1129            let credential_state = pre_connect_credential_state.clone();
1130            // Snapshot at this point — outproc_gate is still None here, so
1131            // signaling-event contexts will carry None for outproc_gate
1132            // (matching the pre-existing behavior prior to B13 refactor).
1133            let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1134            let ctx_builder: crate::lifecycle::hooks::HookContextBuilder = Arc::new(move || {
1135                let snapshot = ctx_builder_snapshot.clone();
1136                let actor_id = actor_id.clone();
1137                let credential_state = credential_state.clone();
1138                Box::pin(async move {
1139                    Some(snapshot.build_bootstrap(&actor_id, &credential_state.credential().await))
1140                })
1141            });
1142            let cb = crate::lifecycle::hooks::build_hook_callback(
1143                self.hook_observer.clone(),
1144                ctx_builder,
1145            );
1146            self.signaling_client.set_hook_callback(cb);
1147        }
1148
1149        tracing::info!("📡 Connecting to signaling server (with credential)");
1150        self.signaling_client
1151            .connect()
1152            .await
1153            .map_err(|e| ActrError::Unavailable(format!("Signaling connect failed: {e}")))?;
1154        tracing::info!("✅ Connected to signaling server");
1155
1156        // Collect background task handles so they can be managed by ActrRefShared later.
1157        let mut task_handles = Vec::new();
1158
1159        // Node-level hook callback, built inside the registration
1160        // setup block below and published back out into this wider
1161        // scope so the mailbox backpressure watchdog can subscribe.
1162        let node_hook_callback: Option<crate::wire::webrtc::HookCallback>;
1163
1164        {
1165            let actor_id = register_ok.actr_id;
1166            let credential = register_ok.credential;
1167
1168            tracing::info!("🆔 Assigned ActrId: {}", actor_id);
1169            tracing::info!("🔐 Received credential (key_id: {})", credential.key_id);
1170            tracing::info!(
1171                "💓 Signaling heartbeat interval: {} seconds",
1172                register_ok.signaling_heartbeat_interval_secs
1173            );
1174
1175            // TurnCredential is a required field; should always be present under normal registration.
1176            tracing::debug!("TurnCredential received, TURN authentication ready");
1177
1178            if let Some(expires_at) = &register_ok.credential_expires_at {
1179                tracing::debug!("⏰ Credential expires at: {}s", expires_at.seconds);
1180            }
1181
1182            // Store ActrId and credential state
1183            self.actor_id = Some(actor_id.clone());
1184            let credential_state = CredentialState::new(
1185                credential,
1186                register_ok.credential_expires_at,
1187                Some(register_ok.turn_credential.clone()),
1188            );
1189            self.credential_state = Some(credential_state.clone());
1190
1191            // Build the node-level lifecycle hook callback once: it is
1192            // reused for the initial `on_credential_renewed`, handed to
1193            // the heartbeat task for subsequent credential events, and
1194            // handed to the mailbox backpressure watchdog for
1195            // `on_mailbox_backpressure` on rising-edge crossings.
1196            //
1197            // The signaling layer already has its own callback installed
1198            // above — this second callback only carries credential and
1199            // mailbox-backpressure events, so no overlap with the
1200            // signaling-event plumbing.
1201            node_hook_callback =
1202                {
1203                    let actor_id_for_hook = actor_id.clone();
1204                    let credential_state_for_hook = credential_state.clone();
1205                    // Snapshot at this point — outproc_gate is still None
1206                    // here; credential / mailbox hook contexts inherit that
1207                    // and therefore cannot issue Dest::Actor(_) calls (same
1208                    // behavior as before B13 refactor).
1209                    let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1210                    let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1211                        Arc::new(move || {
1212                            let snapshot = ctx_builder_snapshot.clone();
1213                            let actor_id = actor_id_for_hook.clone();
1214                            let credential_state = credential_state_for_hook.clone();
1215                            Box::pin(async move {
1216                                Some(snapshot.build_bootstrap(
1217                                    &actor_id,
1218                                    &credential_state.credential().await,
1219                                ))
1220                            })
1221                        });
1222                    Some(crate::lifecycle::hooks::build_hook_callback(
1223                        self.hook_observer.clone(),
1224                        ctx_builder,
1225                    ))
1226                };
1227
1228            // Fire `on_credential_renewed` at initial registration: the
1229            // credential is considered "renewed" from "nothing" to the
1230            // value just issued by AIS. Subsequent renewals fire the
1231            // same hook from `lifecycle::heartbeat`.
1232            if let Some(expires_at) = &register_ok.credential_expires_at {
1233                let new_expiry = std::time::UNIX_EPOCH
1234                    + std::time::Duration::from_secs(expires_at.seconds.max(0) as u64);
1235                if let Some(cb) = node_hook_callback.as_ref() {
1236                    cb(crate::wire::webrtc::HookEvent::CredentialRenewed { new_expiry }).await;
1237                } else {
1238                    tracing::info!(new_expiry = ?new_expiry, "credential renewed");
1239                }
1240            }
1241
1242            // Note: actor_id and credential_state were already set on signaling_client
1243            // before connect (step 3 above), so reconnect URLs already carry correct auth.
1244
1245            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1246            // 1.3. Inproc transports were filled in during `build()`; nothing
1247            //      to stage here now that ContextFactory has been removed.
1248            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1249            tracing::info!("✅ Inproc infrastructure already ready (created in ActrNode::build())");
1250
1251            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1252            // 1.5. Create WebRTC infrastructure
1253            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1254            tracing::info!("🌐 Initializing WebRTC infrastructure");
1255
1256            let media_frame_registry = self.media_frame_registry.clone();
1257
1258            // Create WebRtcCoordinator
1259            let coordinator = Arc::new(crate::wire::webrtc::WebRtcCoordinator::new(
1260                actor_id.clone(),
1261                credential_state.clone(),
1262                self.signaling_client.clone(),
1263                self.config.webrtc.clone(),
1264                media_frame_registry,
1265            ));
1266
1267            // Install the WebRTC hook callback — fires
1268            // WebRtcConnectStart / Connected (with relayed info) /
1269            // Disconnected HookEvents on every peer state change.
1270            {
1271                let actor_id_for_hook = actor_id.clone();
1272                let credential_state_for_hook = credential_state.clone();
1273                // Snapshot before outproc_gate is wired up (just below). This
1274                // preserves the pre-refactor behavior where WebRTC-event
1275                // hook contexts carry outproc_gate = None.
1276                let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1277                let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1278                    Arc::new(move || {
1279                        let snapshot = ctx_builder_snapshot.clone();
1280                        let actor_id = actor_id_for_hook.clone();
1281                        let credential_state = credential_state_for_hook.clone();
1282                        Box::pin(async move {
1283                            Some(
1284                                snapshot.build_bootstrap(
1285                                    &actor_id,
1286                                    &credential_state.credential().await,
1287                                ),
1288                            )
1289                        })
1290                    });
1291                let cb = crate::lifecycle::hooks::build_hook_callback(
1292                    self.hook_observer.clone(),
1293                    ctx_builder,
1294                );
1295                coordinator.set_hook_callback(cb);
1296            }
1297
1298            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1299            // 1.6. Create PeerTransport + PeerGate (new architecture)
1300            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1301            tracing::info!("🏗️  Creating PeerTransport with WebRTC support");
1302
1303            // Create DefaultWireBuilder with WebRTC coordinator
1304            use crate::transport::{DefaultWireBuilder, DefaultWireBuilderConfig};
1305
1306            // WebSocket channel always enabled: target ws:// address is fully discovered at runtime
1307            // Direct-connect mode: encode local node ActrId as hex, sent as X-Actr-Node-Id
1308            let local_id_hex = hex::encode(actor_id.encode_to_vec());
1309            let wire_builder_config = DefaultWireBuilderConfig {
1310                local_id_hex,
1311                enable_webrtc: true,
1312                enable_websocket: true,
1313                // Share the discovered_ws_addresses map so that post-discovery calls
1314                // can use the signaling-provided ws:// URL for this actor node.
1315                discovered_ws_addresses: self.discovered_ws_addresses.clone(),
1316                // Pass credential_state so outbound WS handshake carries X-Actr-Credential,
1317                // enabling peer WebSocketGate to perform Ed25519 signature verification.
1318                credential_state: Some(credential_state.clone()),
1319            };
1320            let wire_builder = Arc::new(DefaultWireBuilder::new(
1321                Some(coordinator.clone()),
1322                wire_builder_config,
1323            ));
1324
1325            // Create PeerTransport
1326            use crate::transport::PeerTransport;
1327            let transport_manager = Arc::new(PeerTransport::new(actor_id.clone(), wire_builder));
1328            self.peer_transport = Some(transport_manager.clone());
1329
1330            // Create PeerGate with WebRTC coordinator for MediaTrack support
1331            use crate::outbound::PeerGate;
1332            let outproc_gate =
1333                Arc::new(PeerGate::new(transport_manager, Some(coordinator.clone())));
1334            let outproc_gate_enum = Gate::Peer(outproc_gate.clone());
1335            tracing::info!("PeerTransport + PeerGate initialized");
1336
1337            let data_stream_registry = self.data_stream_registry.clone();
1338
1339            // Create WebRtcGate with shared pending_requests and DataStreamRegistry
1340            let pending_requests = outproc_gate.get_pending_requests();
1341            let gate = Arc::new(crate::wire::webrtc::gate::WebRtcGate::new(
1342                coordinator.clone(),
1343                pending_requests,
1344                data_stream_registry.clone(),
1345            ));
1346            // Set local_id
1347            gate.set_local_id(actor_id.clone()).await;
1348            tracing::info!(
1349                "✅ WebRtcGate created with shared pending_requests and DataStreamRegistry"
1350            );
1351
1352            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1353            // 1.7. Wire the outproc gate into Inner so subsequent
1354            //      `make_runtime_context` / `bootstrap_ctx_builder` calls
1355            //      observe it. All per-request contexts created by
1356            //      `handle_incoming` go through this field live.
1357            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1358            tracing::info!("🔧 Wiring outproc_gate into node");
1359            self.outproc_gate = Some(outproc_gate_enum);
1360            tracing::info!("✅ Node runtime gates fully initialized (inproc + outproc)");
1361
1362            // Save references
1363            self.webrtc_coordinator = Some(coordinator.clone());
1364            self.webrtc_gate = Some(gate.clone());
1365            tracing::info!("✅ WebRTC infrastructure initialized");
1366
1367            // Fire `on_start` once the runtime context can see the initialized
1368            // gates, before starting request-accepting/background loops. Its
1369            // Err/panic aborts Node::start.
1370            {
1371                let startup_ctx = self
1372                    .bootstrap_ctx_builder()
1373                    .build_bootstrap(&actor_id, &credential_state.credential().await);
1374                let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_start");
1375                let call_executor =
1376                    lifecycle_host_abi(startup_ctx.clone(), self.workload_dispatch.clone());
1377                let mut workload = self.workload_dispatch.lock().await;
1378                crate::lifecycle::hooks::call_lifecycle_hook(
1379                    "on_start",
1380                    workload.on_start(startup_ctx, invocation, &call_executor),
1381                )
1382                .await?;
1383            }
1384
1385            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1386            // 1.7.6. WebSocket Server (direct-connect mode, optional)
1387            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1388            if let Some(listen_port) = self.config.websocket_listen_port {
1389                tracing::info!(
1390                    "🔌 WebSocket direct-connect mode enabled, binding port {}",
1391                    listen_port
1392                );
1393                use crate::key_cache::AisKeyCache;
1394                use crate::wire::websocket::gate::WsAuthContext;
1395                use crate::wire::websocket::{WebSocketGate, WebSocketServer};
1396
1397                // Build AisKeyCache and seed it with the signing key from the registration response
1398                let ais_key_cache = AisKeyCache::new();
1399                if !register_ok.signing_pubkey.is_empty() {
1400                    match ais_key_cache
1401                        .seed(register_ok.signing_key_id, &register_ok.signing_pubkey)
1402                        .await
1403                    {
1404                        Ok(()) => tracing::info!(
1405                            key_id = register_ok.signing_key_id,
1406                            "🔑 AisKeyCache seeded from RegisterOk"
1407                        ),
1408                        Err(e) => tracing::warn!(
1409                            key_id = register_ok.signing_key_id,
1410                            error = ?e,
1411                            "AisKeyCache seed failed; WebSocket will reject all inbound connections"
1412                        ),
1413                    }
1414                } else {
1415                    tracing::warn!(
1416                        "RegisterOk missing signing_pubkey; WebSocket credential verification will degrade"
1417                    );
1418                }
1419
1420                let auth_ctx = WsAuthContext {
1421                    ais_key_cache,
1422                    actor_id: actor_id.clone(),
1423                    credential_state: credential_state.clone(),
1424                    signaling_client: self.signaling_client.clone(),
1425                };
1426
1427                match WebSocketServer::bind(listen_port).await {
1428                    Ok((ws_server, conn_rx)) => {
1429                        ws_server.start(self.shutdown_token.clone());
1430                        let ws_gate = Arc::new(WebSocketGate::new(
1431                            conn_rx,
1432                            outproc_gate.get_pending_requests(),
1433                            data_stream_registry.clone(),
1434                            Some(auth_ctx),
1435                        ));
1436
1437                        // Install the WebSocket peer-lifecycle hook.
1438                        {
1439                            let actor_id_for_hook = actor_id.clone();
1440                            let credential_state_for_hook = credential_state.clone();
1441                            // Snapshot taken after outproc_gate is live: ws
1442                            // peer-lifecycle hook contexts can issue
1443                            // Dest::Actor(_) calls.
1444                            let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1445                            let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1446                                Arc::new(move || {
1447                                    let snapshot = ctx_builder_snapshot.clone();
1448                                    let actor_id = actor_id_for_hook.clone();
1449                                    let credential_state = credential_state_for_hook.clone();
1450                                    Box::pin(async move {
1451                                        Some(snapshot.build_bootstrap(
1452                                            &actor_id,
1453                                            &credential_state.credential().await,
1454                                        ))
1455                                    })
1456                                });
1457                            let cb = crate::lifecycle::hooks::build_hook_callback(
1458                                self.hook_observer.clone(),
1459                                ctx_builder,
1460                            );
1461                            ws_gate.set_hook_callback(cb);
1462                        }
1463
1464                        self.websocket_gate = Some(ws_gate);
1465                        tracing::info!(
1466                            "✅ WebSocketServer + WebSocketGate initialized (credential auth enabled)"
1467                        );
1468                    }
1469                    Err(e) => {
1470                        tracing::error!(
1471                            "❌ Failed to bind WebSocket server on port {}: {:?}",
1472                            listen_port,
1473                            e
1474                        );
1475                    }
1476                }
1477            }
1478
1479            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1480            // 1.7.5. Create shared state for credential management
1481            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1482            // Shared credential state initialized above; reused across tasks
1483
1484            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1485            // 1.8. Spawn heartbeat task (periodic Ping to signaling server)
1486            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1487            {
1488                let shutdown = self.shutdown_token.clone();
1489                let client = self.signaling_client.clone();
1490                let actor_id_for_heartbeat = actor_id.clone();
1491                let credential_state_for_heartbeat = credential_state.clone();
1492                let mailbox_for_heartbeat = self.mailbox.clone();
1493                let register_request_for_heartbeat = register_request.clone();
1494                let webrtc_coordinator_for_heartbeat = self.webrtc_coordinator.clone();
1495                let webrtc_gate_for_heartbeat = self.webrtc_gate.clone();
1496
1497                // Use interval from registration response, default to 30s
1498                let heartbeat_interval_secs = register_ok.signaling_heartbeat_interval_secs;
1499                let heartbeat_interval = if heartbeat_interval_secs > 0 {
1500                    Duration::from_secs(heartbeat_interval_secs as u64)
1501                } else {
1502                    Duration::from_secs(30)
1503                };
1504                let ais_endpoint_for_heartbeat = self.config.ais_endpoint.clone();
1505                let realm_secret_for_heartbeat = self.config.realm_secret.clone();
1506                let heartbeat_handle = tokio::spawn(crate::lifecycle::heartbeat::heartbeat_task(
1507                    shutdown,
1508                    client,
1509                    actor_id_for_heartbeat,
1510                    credential_state_for_heartbeat,
1511                    mailbox_for_heartbeat,
1512                    heartbeat_interval,
1513                    register_request_for_heartbeat,
1514                    ais_endpoint_for_heartbeat,
1515                    realm_secret_for_heartbeat,
1516                    node_hook_callback.clone(),
1517                    webrtc_coordinator_for_heartbeat,
1518                    webrtc_gate_for_heartbeat,
1519                ));
1520                task_handles.push(heartbeat_handle);
1521            }
1522            tracing::info!(
1523                "✅ Heartbeat task started (interval: {}s)",
1524                register_ok.signaling_heartbeat_interval_secs
1525            );
1526
1527            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1528            // 1.8.5. Spawn network event processing loop
1529            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1530            if let Some(event_rx) = self.network_event_rx.take() {
1531                use crate::lifecycle::network_event::DefaultNetworkEventProcessor;
1532
1533                // Create DefaultNetworkEventProcessor
1534                // If debounce config exists, use new_with_debounce
1535                let event_processor =
1536                    if let Some(config) = self.network_event_debounce_config.clone() {
1537                        Arc::new(
1538                            DefaultNetworkEventProcessor::new_with_debounce_and_peer_transport(
1539                                self.signaling_client.clone(),
1540                                self.webrtc_coordinator.clone(),
1541                                config,
1542                                self.peer_transport.clone(),
1543                            ),
1544                        )
1545                    } else {
1546                        Arc::new(DefaultNetworkEventProcessor::new_with_peer_transport(
1547                            self.signaling_client.clone(),
1548                            self.webrtc_coordinator.clone(),
1549                            self.peer_transport.clone(),
1550                        ))
1551                    };
1552
1553                let shutdown = self.shutdown_token.clone();
1554                let network_event_handle = tokio::spawn(async move {
1555                    Self::network_event_loop(event_rx, event_processor, shutdown).await;
1556                });
1557                task_handles.push(network_event_handle);
1558                tracing::info!("network_event.node.loop_started");
1559            } else {
1560                tracing::debug!("network_event.node.loop_not_started_no_handle");
1561            }
1562
1563            {
1564                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1565                // 1.9. Spawn dedicated Unregister task (best-effort, with timeout)
1566                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1567                //
1568                // This task:
1569                // - Waits for shutdown_token to be cancelled (e.g., wait_for_ctrl_c_and_shutdown)
1570                // - Then sends UnregisterRequest via signaling client with a timeout
1571                //
1572                // NOTE: we push its JoinHandle into task_handles so it can be aborted
1573                // by ActrRefShared::Drop if needed.
1574                let shutdown = self.shutdown_token.clone();
1575                let client = self.signaling_client.clone();
1576                let actor_id_for_unreg = actor_id.clone();
1577                let credential_state_for_unreg = credential_state.clone();
1578                let webrtc_coordinator = self.webrtc_coordinator.clone();
1579
1580                let unregister_handle = tokio::spawn(async move {
1581                    // Wait for shutdown signal
1582                    shutdown.cancelled().await;
1583                    tracing::info!(
1584                        "📡 Shutdown signal received, sending UnregisterRequest for Actor {}",
1585                        actor_id_for_unreg
1586                    );
1587
1588                    // 1. Close all WebRTC peer connections first (if any)
1589                    if let Some(coord) = webrtc_coordinator {
1590                        if let Err(e) = coord.close_all_peers().await {
1591                            tracing::warn!(
1592                                "⚠️ Failed to close all WebRTC peers before UnregisterRequest: {}",
1593                                e
1594                            );
1595                        } else {
1596                            tracing::info!("✅ All WebRTC peers closed before UnregisterRequest");
1597                        }
1598                    } else {
1599                        tracing::debug!(
1600                            "WebRTC coordinator not found before UnregisterRequest (no WebRTC?)"
1601                        );
1602                    }
1603
1604                    // 2. Then send UnregisterRequest with a timeout (e.g. 5 seconds)
1605                    let result = tokio::time::timeout(
1606                        Duration::from_secs(5),
1607                        client.send_unregister_request(
1608                            actor_id_for_unreg.clone(),
1609                            credential_state_for_unreg.credential().await,
1610                            Some("Graceful shutdown".to_string()),
1611                        ),
1612                    )
1613                    .await;
1614                    tracing::info!("UnregisterRequest result: {:?}", result);
1615                    match result {
1616                        Ok(Ok(_)) => {
1617                            tracing::info!(
1618                                "✅ UnregisterRequest sent to signaling server for Actor {}",
1619                                actor_id_for_unreg
1620                            );
1621                        }
1622                        Ok(Err(e)) => {
1623                            tracing::warn!(
1624                                "⚠️ Failed to send UnregisterRequest for Actor {}: {}",
1625                                actor_id_for_unreg,
1626                                e
1627                            );
1628                        }
1629                        Err(_) => {
1630                            tracing::warn!(
1631                                "⚠️ UnregisterRequest timeout (5s) for Actor {}",
1632                                actor_id_for_unreg
1633                            );
1634                        }
1635                    }
1636                });
1637
1638                task_handles.push(unregister_handle);
1639            }
1640        } // end registration setup block
1641
1642        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1643        // 2. Transport layer initialization (completed via WebRTC infrastructure)
1644        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1645        tracing::info!("✅ Transport layer initialized via WebRTC infrastructure");
1646
1647        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1648        // 3.1 Convert to Arc (before starting background loops)
1649        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1650        // Clone actor_id before moving self into Arc
1651        let actor_id = self
1652            .actor_id
1653            .as_ref()
1654            .ok_or_else(|| ActrError::Internal("Actor ID not set".to_string()))?
1655            .clone();
1656        // Snapshot now that outproc_gate has been wired above; this builder
1657        // is shared between on_start / on_stop hooks and the ActrRefShared
1658        // handle returned to the caller.
1659        let bootstrap_ctx_builder = self.bootstrap_ctx_builder();
1660        let credential_state = self
1661            .credential_state
1662            .clone()
1663            .expect("CredentialState must be initialized in start()");
1664        let shutdown_token = self.shutdown_token.clone();
1665        let node_ref = Arc::new(self);
1666
1667        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1668        // 3.2. Register workload-level stop hook.
1669        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1670        {
1671            let node = node_ref.clone();
1672            let actor_id = actor_id.clone();
1673            let credential_state = credential_state.clone();
1674            let shutdown = shutdown_token.clone();
1675            let on_stop_handle = tokio::spawn(async move {
1676                shutdown.cancelled().await;
1677                let stop_ctx = node
1678                    .bootstrap_ctx_builder()
1679                    .build_bootstrap(&actor_id, &credential_state.credential().await);
1680                let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_stop");
1681                let call_executor =
1682                    lifecycle_host_abi(stop_ctx.clone(), node.workload_dispatch.clone());
1683                let mut workload = node.workload_dispatch.lock().await;
1684                if let Err(e) = crate::lifecycle::hooks::call_lifecycle_hook(
1685                    "on_stop",
1686                    workload.on_stop(stop_ctx, invocation, &call_executor),
1687                )
1688                .await
1689                {
1690                    tracing::warn!(error = %e, "workload on_stop returned Err");
1691                }
1692            });
1693            task_handles.push(on_stop_handle);
1694        }
1695
1696        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1697        // 3.5. Start WebRTC background loops
1698        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1699        tracing::info!("🚀 Starting WebRTC background loops");
1700
1701        // Start WebRtcCoordinator signaling loop
1702        if let Some(coordinator) = &node_ref.webrtc_coordinator {
1703            coordinator.clone().start().await.map_err(|e| {
1704                ActrError::Unavailable(format!("WebRtcCoordinator start failed: {e}"))
1705            })?;
1706            tracing::info!("✅ WebRtcCoordinator signaling loop started");
1707        }
1708
1709        // Start WebRtcGate message receive loop (route to Mailbox)
1710        if let Some(gate) = &node_ref.webrtc_gate {
1711            gate.start_receive_loop(node_ref.mailbox.clone())
1712                .await
1713                .map_err(|e| {
1714                    ActrError::Unavailable(format!("WebRtcGate receive loop start failed: {e}"))
1715                })?;
1716            tracing::info!("✅ WebRtcGate → Mailbox routing started");
1717        }
1718
1719        // Start WebSocketGate message receive loop (route to Mailbox, direct-connect mode)
1720        if let Some(ws_gate) = &node_ref.websocket_gate {
1721            ws_gate
1722                .start_receive_loop(node_ref.mailbox.clone())
1723                .await
1724                .map_err(|e| {
1725                    ActrError::Unavailable(format!("WebSocketGate receive loop start failed: {e}"))
1726                })?;
1727            tracing::info!("✅ WebSocketGate → Mailbox routing started");
1728        }
1729        tracing::info!("✅ WebRTC background loops started");
1730
1731        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1732        // 4.6. Start Inproc receive loop (Shell → Guest)
1733        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1734        if let Some(shell_to_workload) = &node_ref.shell_to_workload {
1735            tracing::info!("🔄 Starting Inproc receive loop (Shell → Guest)");
1736            // Start Guest receive loop (Shell → Guest REQUEST)
1737            if let Some(workload_to_shell) = &node_ref.workload_to_shell {
1738                let node = node_ref.clone();
1739                let request_rx_lane = shell_to_workload
1740                    .get_lane(PayloadType::RpcReliable, None)
1741                    .await
1742                    .map_err(|e| {
1743                        ActrError::Unavailable(format!("Failed to get guest receive lane: {e}"))
1744                    })?;
1745                let response_tx = workload_to_shell.clone();
1746                let shutdown = shutdown_token.clone();
1747
1748                let inproc_handle = tokio::spawn(async move {
1749                    loop {
1750                        tokio::select! {
1751                            _ = shutdown.cancelled() => {
1752                                tracing::info!("📭 Guest receive loop (Shell → Guest) received shutdown signal");
1753                                break;
1754                            }
1755                            envelope_result = request_rx_lane.recv_envelope() => {
1756                                match envelope_result {
1757                                    Ok(envelope) => {
1758                                        let request_id = envelope.request_id.clone();
1759                                        tracing::debug!("📨 Guest received REQUEST from Shell: request_id={}", request_id);
1760                                        // Extract and set tracing context from envelope
1761                                        #[cfg(feature = "opentelemetry")]
1762                                        let span = {
1763                                            let actr_id_str = node.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default();
1764                                            let span = tracing::info_span!("ActrNode.lane_receive", actr_id = %actr_id_str, request_id = %request_id);
1765                                            set_parent_from_rpc_envelope(&span, &envelope);
1766                                            span
1767                                        };
1768
1769                                        // Shell calls have no caller_id (local process communication)
1770                                        let handle_incoming_fut = node.handle_incoming(envelope.clone(), None);
1771                                        #[cfg(feature = "opentelemetry")]
1772                                        let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
1773
1774                                        match handle_incoming_fut.await {
1775                                            Ok(response_bytes) => {
1776                                                // Send RESPONSE back via workload_to_shell
1777                                                // Keep same route_key (no prefix needed - separate channels!)
1778                                                #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1779                                                let mut response_envelope = RpcEnvelope {
1780                                                    route_key: envelope.route_key.clone(),
1781                                                    payload: Some(response_bytes),
1782                                                    error: None,
1783                                                    traceparent: None,
1784                                                    tracestate: None,
1785                                                    request_id: request_id.clone(),
1786                                                    metadata: Vec::new(),
1787                                                    timeout_ms: 30000,
1788                                                };
1789                                                // Inject tracing context
1790                                                #[cfg(feature = "opentelemetry")]
1791                                                inject_span_context_to_rpc(&span, &mut response_envelope);
1792
1793                                                // Send via Guest → Shell channel
1794                                                let send_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, response_envelope);
1795                                                #[cfg(feature = "opentelemetry")]
1796                                                let send_response_fut = send_response_fut.instrument(span.clone());
1797                                                if let Err(e) = send_response_fut.await {
1798                                                    tracing::error!(
1799                                                        severity = 7,
1800                                                        error_category = "transport_error",
1801                                                        request_id = %request_id,
1802                                                        "❌ Failed to send RESPONSE to Shell: {:?}",
1803                                                        e
1804                                                    );
1805                                                }
1806                                            }
1807                                            Err(e) => {
1808                                                tracing::error!(
1809                                                    severity = 6,
1810                                                    error_category = "handler_error",
1811                                                    request_id = %request_id,
1812                                                    route_key = %envelope.route_key,
1813                                                    "❌ Guest message handling failed: {:?}",
1814                                                    e
1815                                                );
1816
1817                                                // Send error response (system-level error on envelope)
1818                                                let error_response = actr_protocol::ErrorResponse {
1819                                                    code: protocol_error_to_code(&e),
1820                                                    message: e.to_string(),
1821                                                };
1822                                                #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1823                                                let mut error_envelope = RpcEnvelope {
1824                                                    route_key: envelope.route_key.clone(),
1825                                                    payload: None,
1826                                                    error: Some(error_response),
1827                                                    traceparent: envelope.traceparent.clone(),
1828                                                    tracestate: envelope.tracestate.clone(),
1829                                                    request_id: request_id.clone(),
1830                                                    metadata: Vec::new(),
1831                                                    timeout_ms: 30000,
1832                                                };
1833                                                // Inject tracing context
1834                                                #[cfg(feature = "opentelemetry")]
1835                                                inject_span_context_to_rpc(&span, &mut error_envelope);
1836
1837                                                let send_error_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, error_envelope);
1838                                                #[cfg(feature = "opentelemetry")]
1839                                                let send_error_response_fut = send_error_response_fut.instrument(span);
1840                                                if let Err(send_err) = send_error_response_fut.await {
1841                                                    tracing::error!(
1842                                                        severity = 7,
1843                                                        error_category = "transport_error",
1844                                                        request_id = %request_id,
1845                                                        "❌ Failed to send ERROR response to Shell: {:?}",
1846                                                        send_err
1847                                                    );
1848                                                }
1849                                            }
1850                                        }
1851                                    }
1852                                    Err(e) => {
1853                                        tracing::error!(
1854                                            severity = 8,
1855                                            error_category = "transport_error",
1856                                            "❌ Failed to receive from Shell → Guest lane: {:?}",
1857                                            e
1858                                        );
1859                                        break;
1860                                    }
1861                                }
1862                            }
1863                        }
1864                    }
1865                    tracing::info!("✅ Guest receive loop (Shell → Guest) terminated gracefully");
1866                });
1867                task_handles.push(inproc_handle);
1868            }
1869        }
1870        tracing::info!("✅ Guest receive loop (Shell → Guest REQUEST) started");
1871
1872        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1873        // 4.7. Start Shell receive loop (Guest → Shell RESPONSE)
1874        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1875        tracing::info!("🔄 Starting Shell receive loop (Guest → Shell RESPONSE)");
1876        if let Some(workload_to_shell) = &node_ref.workload_to_shell {
1877            // Start Shell receive loop (Guest → Shell RESPONSE)
1878            if let Some(shell_to_workload) = &node_ref.shell_to_workload {
1879                let response_rx_lane = workload_to_shell
1880                    .get_lane(PayloadType::RpcReliable, None)
1881                    .await
1882                    .map_err(|e| {
1883                        ActrError::Unavailable(format!("Failed to get shell receive lane: {e}"))
1884                    })?;
1885                let request_mgr = shell_to_workload.clone();
1886                let shutdown = shutdown_token.clone();
1887
1888                let shell_receive_handle = tokio::spawn(async move {
1889                    loop {
1890                        tokio::select! {
1891                            _ = shutdown.cancelled() => {
1892                                tracing::info!("📭 Shell receive loop (Guest → Shell) received shutdown signal");
1893                                break;
1894                            }
1895                            envelope_result = response_rx_lane.recv_envelope() => {
1896                                match envelope_result {
1897                                    Ok(envelope) => {
1898                                        tracing::debug!(
1899                                            "📨 Shell received RESPONSE from Guest: request_id={}",
1900                                            envelope.request_id
1901                                        );
1902
1903                                        // Check if response is success or error
1904                                        match (envelope.payload, envelope.error) {
1905                                            (Some(payload), None) => {
1906                                                // Success response
1907                                                if let Err(e) = request_mgr
1908                                                    .complete_response(&envelope.request_id, payload)
1909                                                    .await
1910                                                {
1911                                                    tracing::warn!(
1912                                                        severity = 4,
1913                                                        error_category = "orphan_response",
1914                                                        request_id = %envelope.request_id,
1915                                                        "⚠️  No pending request found for response: {:?}",
1916                                                        e
1917                                                    );
1918                                                }
1919                                            }
1920                                            (None, Some(error)) => {
1921                                                // Error response - convert to ActrError and complete with error
1922                                                let actr_err = ActrError::Unavailable(format!("RPC error {}: {}", error.code, error.message));
1923                                                if let Err(e) = request_mgr
1924                                                    .complete_error(&envelope.request_id, actr_err)
1925                                                    .await
1926                                                {
1927                                                    tracing::warn!(
1928                                                        severity = 4,
1929                                                        error_category = "orphan_response",
1930                                                        request_id = %envelope.request_id,
1931                                                        "⚠️  No pending request found for error response: {:?}",
1932                                                        e
1933                                                    );
1934                                                }
1935                                            }
1936                                            _ => {
1937                                                tracing::error!(
1938                                                    severity = 7,
1939                                                    error_category = "protocol_error",
1940                                                    request_id = %envelope.request_id,
1941                                                    "❌ Invalid RpcEnvelope: both payload and error are present or both absent"
1942                                                );
1943                                            }
1944                                        }
1945                                    }
1946                                    Err(e) => {
1947                                        tracing::error!(
1948                                            severity = 8,
1949                                            error_category = "transport_error",
1950                                            "❌ Failed to receive from Guest → Shell lane: {:?}",
1951                                            e
1952                                        );
1953                                        break;
1954                                    }
1955                                }
1956                            }
1957                        }
1958                    }
1959                    tracing::info!("✅ Shell receive loop (Guest → Shell) terminated gracefully");
1960                });
1961                task_handles.push(shell_receive_handle);
1962            }
1963        }
1964        tracing::info!("✅ Shell receive loop (Guest → Shell RESPONSE) started");
1965
1966        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1967        // 4.9. Mailbox backpressure watchdog
1968        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1969        //
1970        // Emits the framework `on_mailbox_backpressure` hook once per
1971        // rising-edge crossing of the configured threshold.
1972        //
1973        // Preferred path: a push-based notification from the mailbox
1974        // backend via [`Mailbox::set_depth_observer`], which runs
1975        // synchronously on every enqueue and has zero worst-case delay.
1976        //
1977        // Fallback path: mailbox backends without depth support (or
1978        // which can't cheaply compute depth on every enqueue) keep
1979        // using a 1 Hz poll of [`Mailbox::status`].
1980        let backpressure_threshold = node_ref.mailbox_backpressure_threshold;
1981        {
1982            use std::sync::atomic::{AtomicBool, Ordering};
1983            let mailbox = node_ref.mailbox.clone();
1984            let shutdown = shutdown_token.clone();
1985            let hook_cb = node_hook_callback.clone();
1986            let triggered = Arc::new(AtomicBool::new(false));
1987
1988            // Shared rising-edge state + hook-firing closure used by
1989            // both the push and polling code paths.
1990            let fire_if_rising = {
1991                let triggered = triggered.clone();
1992                let hook_cb = hook_cb.clone();
1993                Arc::new(move |queue_len: usize| {
1994                    if queue_len >= backpressure_threshold {
1995                        if !triggered.swap(true, Ordering::AcqRel) {
1996                            if let Some(cb) = hook_cb.as_ref() {
1997                                let cb = cb.clone();
1998                                tokio::spawn(async move {
1999                                    cb(crate::wire::webrtc::HookEvent::MailboxBackpressure {
2000                                        queue_len,
2001                                        threshold: backpressure_threshold,
2002                                    })
2003                                    .await;
2004                                });
2005                            } else {
2006                                tracing::warn!(
2007                                    queue_len,
2008                                    threshold = backpressure_threshold,
2009                                    "mailbox backpressure",
2010                                );
2011                            }
2012                        }
2013                    } else if triggered.swap(false, Ordering::AcqRel) {
2014                        tracing::info!(
2015                            queue_len,
2016                            threshold = backpressure_threshold,
2017                            "mailbox backpressure cleared",
2018                        );
2019                    }
2020                })
2021            };
2022
2023            // Try the push path first. The observer installs only if
2024            // the backend supports it; otherwise `installed` is `false`
2025            // and we fall through to polling.
2026            struct EnqueueObserver {
2027                fire: Arc<dyn Fn(usize) + Send + Sync + 'static>,
2028            }
2029            impl actr_runtime_mailbox::MailboxDepthObserver for EnqueueObserver {
2030                fn on_depth_change(&self, queued_messages: usize) {
2031                    (self.fire)(queued_messages);
2032                }
2033            }
2034
2035            let installed = {
2036                let observer: Arc<dyn actr_runtime_mailbox::MailboxDepthObserver> =
2037                    Arc::new(EnqueueObserver {
2038                        fire: fire_if_rising.clone(),
2039                    });
2040                mailbox.set_depth_observer(observer)
2041            };
2042
2043            if installed {
2044                tracing::debug!("mailbox backpressure watchdog: push notifications enabled");
2045            } else {
2046                tracing::debug!(
2047                    "mailbox backpressure watchdog: backend does not support push, falling back to 1 Hz polling"
2048                );
2049                let mailbox_for_poll = mailbox.clone();
2050                let shutdown_for_poll = shutdown.clone();
2051                let fire_for_poll = fire_if_rising.clone();
2052                let watchdog_handle = tokio::spawn(async move {
2053                    let mut ticker = tokio::time::interval(Duration::from_secs(1));
2054                    ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
2055                    loop {
2056                        tokio::select! {
2057                            _ = shutdown_for_poll.cancelled() => {
2058                                tracing::debug!(
2059                                    "mailbox backpressure watchdog shutting down"
2060                                );
2061                                break;
2062                            }
2063                            _ = ticker.tick() => {
2064                                let status = match mailbox_for_poll.status().await {
2065                                    Ok(s) => s,
2066                                    Err(e) => {
2067                                        tracing::debug!(?e, "mailbox status poll failed");
2068                                        continue;
2069                                    }
2070                                };
2071                                fire_for_poll(status.queued_messages as usize);
2072                            }
2073                        }
2074                    }
2075                });
2076                task_handles.push(watchdog_handle);
2077            }
2078        }
2079
2080        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2081        // 5. Start Mailbox processing loop (State Path)
2082        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2083        tracing::info!("🔄 Starting Mailbox processing loop (State Path)");
2084        {
2085            let node = node_ref.clone();
2086            let mailbox = node_ref.mailbox.clone();
2087            let gate = node_ref.webrtc_gate.clone();
2088            let shutdown = shutdown_token.clone();
2089
2090            let mailbox_handle = tokio::spawn(async move {
2091                loop {
2092                    tokio::select! {
2093                        // Listen for shutdown signal
2094                        _ = shutdown.cancelled() => {
2095                            tracing::info!("📭 Mailbox loop received shutdown signal");
2096                            break;
2097                        }
2098                        // Dequeue messages (by priority)
2099                        result = mailbox.dequeue() => {
2100                            match result {
2101                                Ok(messages) => {
2102                                    if messages.is_empty() {
2103                                        // Queue empty, sleep briefly
2104                                        tokio::time::sleep(Duration::from_millis(10)).await;
2105                                        continue;
2106                                    }
2107                                    tracing::debug!("📬 Mailbox dequeue: {} messages", messages.len());
2108
2109                                    // Process messages one by one
2110                                    for msg_record in messages {
2111                                        // Deserialize RpcEnvelope (Protobuf)
2112                                        match RpcEnvelope::decode(&msg_record.payload[..]) {
2113                                            Ok(envelope) => {
2114                                                let request_id = envelope.request_id.clone();
2115                                                let queue_latency_ms = (chrono::Utc::now() - msg_record.created_at).num_milliseconds();
2116                                                tracing::info!(request_id = %request_id, queue_latency_ms = queue_latency_ms, "rpc.mailbox.dequeued");
2117
2118                                                tracing::debug!("📦 Processing message: request_id={}", request_id);
2119                                                #[cfg(feature = "opentelemetry")]
2120                                                let span = {
2121                                                    let actr_id_str = node.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default();
2122                                                    let span = tracing::info_span!("ActrNode.mailbox_receive", actr_id = %actr_id_str, request_id = %request_id, queue_wait_ms = queue_latency_ms);
2123                                                    set_parent_from_rpc_envelope(&span, &envelope);
2124                                                    span
2125                                                };
2126
2127                                                // Decode caller_id from MessageRecord.from (transport layer)
2128                                                let caller_id_result = ActrId::decode(&msg_record.from[..]);
2129                                                let caller_id_ref = caller_id_result.as_ref().ok();
2130
2131                                                if caller_id_ref.is_none() {
2132                                                    tracing::warn!(
2133                                                        request_id = %request_id,
2134                                                        "⚠️  Failed to decode caller_id from MessageRecord.from"
2135                                                    );
2136                                                }
2137
2138                                                // Call handle_incoming with caller_id from transport layer
2139                                                let handle_incoming_fut = node.handle_incoming(envelope.clone(), caller_id_ref);
2140                                                #[cfg(feature = "opentelemetry")]
2141                                                let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
2142
2143                                                match handle_incoming_fut.await {
2144                                                    Ok(response_bytes) => {
2145                                                        // Send response (reuse request_id)
2146                                                        if let Some(ref gate) = gate {
2147                                                            // Use already decoded caller_id
2148                                                            match caller_id_result {
2149                                                                Ok(caller) => {
2150                                                                    // Construct response RpcEnvelope (reuse request_id!)
2151                                                                    #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
2152                                                                    let mut response_envelope = RpcEnvelope {
2153                                                                        request_id, // Reuse!
2154                                                                        route_key: envelope.route_key.clone(),
2155                                                                        payload: Some(response_bytes),
2156                                                                        error: None,
2157                                                                        traceparent: envelope.traceparent.clone(),
2158                                                                        tracestate: envelope.tracestate.clone(),
2159                                                                        metadata: Vec::new(), // Response doesn't need extra metadata
2160                                                                        timeout_ms: 30000,
2161                                                                    };
2162                                                                    // Inject tracing context
2163                                                                    #[cfg(feature = "opentelemetry")]
2164                                                                    inject_span_context_to_rpc(&span, &mut response_envelope);
2165
2166                                                                    let send_response_fut = gate.send_response(&caller, response_envelope);
2167                                                                    #[cfg(feature = "opentelemetry")]
2168                                                                    let send_response_fut = send_response_fut.instrument(span);
2169                                                                    if let Err(e) = send_response_fut.await {
2170                                                                        tracing::error!(
2171                                                                            severity = 7,
2172                                                                            error_category = "transport_error",
2173                                                                            request_id = %envelope.request_id,
2174                                                                            "❌ Failed to send response: {:?}",
2175                                                                            e
2176                                                                        );
2177                                                                    }
2178                                                                }
2179                                                                Err(e) => {
2180                                                                    tracing::error!(
2181                                                                        severity = 8,
2182                                                                        error_category = "protobuf_decode",
2183                                                                        request_id = %envelope.request_id,
2184                                                                        "❌ Failed to decode caller_id: {:?}",
2185                                                                        e
2186                                                                    );
2187                                                                }
2188                                                            }
2189                                                        }
2190
2191                                                        // ACK message
2192                                                        if let Err(e) = mailbox.ack(msg_record.id).await {
2193                                                            tracing::error!(
2194                                                                severity = 9,
2195                                                                error_category = "mailbox_error",
2196                                                                request_id = %envelope.request_id,
2197                                                                message_id = %msg_record.id,
2198                                                                "❌ Mailbox ACK failed: {:?}",
2199                                                                e
2200                                                            );
2201                                                        }
2202                                                    }
2203                                                    Err(e) => {
2204                                                        tracing::error!(
2205                                                            severity = 6,
2206                                                            error_category = "handler_error",
2207                                                            request_id = %envelope.request_id,
2208                                                            route_key = %envelope.route_key,
2209                                                            "❌ handle_incoming failed: {:?}", e
2210                                                        );
2211                                                        // ACK to avoid infinite retries
2212                                                        // Application errors are caller's responsibility
2213                                                        let _ = mailbox.ack(msg_record.id).await;
2214                                                    }
2215                                                }
2216                                            }
2217                                            Err(e) => {
2218                                                // Poison message - cannot decode RpcEnvelope
2219                                                tracing::error!(
2220                                                    severity = 9,
2221                                                    error_category = "protobuf_decode",
2222                                                    message_id = %msg_record.id,
2223                                                    "❌ Poison message: Failed to deserialize RpcEnvelope: {:?}",
2224                                                    e
2225                                                );
2226
2227                                                // Write to Dead Letter Queue
2228                                                use actr_runtime_mailbox::DlqRecord;
2229                                                use chrono::Utc;
2230                                                use uuid::Uuid;
2231
2232                                                let dlq_record = DlqRecord {
2233                                                    id: Uuid::new_v4(),
2234                                                    original_message_id: Some(msg_record.id.to_string()),
2235                                                    from: Some(msg_record.from.clone()),
2236                                                    to: node.actor_id.as_ref().map(|id| {
2237                                                        let mut buf = Vec::new();
2238                                                        id.encode(&mut buf).unwrap();
2239                                                        buf
2240                                                    }),
2241                                                    raw_bytes: msg_record.payload.clone(),
2242                                                    error_message: format!("Protobuf decode failed: {e}"),
2243                                                    error_category: "protobuf_decode".to_string(),
2244                                                    trace_id: format!("mailbox-{}", msg_record.id),
2245                                                    request_id: None,
2246                                                    created_at: Utc::now(),
2247                                                    redrive_attempts: 0,
2248                                                    last_redrive_at: None,
2249                                                    context: Some(format!(
2250                                                        r#"{{"source":"mailbox","priority":"{}"}}"#,
2251                                                        match msg_record.priority {
2252                                                            actr_runtime_mailbox::MessagePriority::High => "high",
2253                                                            actr_runtime_mailbox::MessagePriority::Normal => "normal",
2254                                                        }
2255                                                    )),
2256                                                };
2257
2258                                                if let Err(dlq_err) = node.dlq.enqueue(dlq_record).await {
2259                                                    tracing::error!(
2260                                                        severity = 10,
2261                                                        "❌ CRITICAL: Failed to write poison message to DLQ: {:?}",
2262                                                        dlq_err
2263                                                    );
2264                                                } else {
2265                                                    tracing::warn!(
2266                                                        severity = 9,
2267                                                        "☠️ Poison message moved to DLQ: message_id={}",
2268                                                        msg_record.id
2269                                                    );
2270                                                }
2271
2272                                                // ACK the poison message to remove from mailbox
2273                                                let _ = mailbox.ack(msg_record.id).await;
2274                                            }
2275                                        }
2276                                    }
2277                                }
2278                                Err(e) => {
2279                                    tracing::error!(
2280                                        severity = 9,
2281                                        error_category = "mailbox_error",
2282                                        "❌ Mailbox dequeue failed: {:?}", e
2283                                    );
2284                                    tokio::time::sleep(Duration::from_secs(1)).await;
2285                                }
2286                            }
2287                        }
2288                    }
2289                }
2290                tracing::info!("✅ Mailbox processing loop terminated gracefully");
2291            });
2292
2293            task_handles.push(mailbox_handle);
2294        }
2295        tracing::info!("✅ Mailbox processing loop started");
2296        tracing::info!("✅ ActrNode started successfully");
2297
2298        {
2299            let ready_ctx = bootstrap_ctx_builder
2300                .build_bootstrap(&actor_id, &credential_state.credential().await);
2301            let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_ready");
2302            let call_executor =
2303                lifecycle_host_abi(ready_ctx.clone(), node_ref.workload_dispatch.clone());
2304            let mut workload = node_ref.workload_dispatch.lock().await;
2305            if let Err(e) = crate::lifecycle::hooks::call_lifecycle_hook(
2306                "on_ready",
2307                workload.on_ready(ready_ctx, invocation, &call_executor),
2308            )
2309            .await
2310            {
2311                tracing::warn!(error = %e, "workload on_ready returned Err");
2312            }
2313        }
2314
2315        // Create ActrRefShared
2316        let shared = Arc::new(ActrRefShared {
2317            actor_id,
2318            bootstrap_ctx_builder,
2319            credential_state,
2320            shutdown_token,
2321            task_handles: Mutex::new(task_handles),
2322        });
2323
2324        // Create ActrRef
2325        tracing::info!("✅ ActrRef created (Shell → Guest communication handle)");
2326
2327        Ok(ActrRef { shared })
2328    }
2329}