Skip to main content

actr_hyper/lifecycle/
node.rs

1//! Node runtime inner — holds all running-state fields for an attached node.
2//!
3//! This module is the internal implementation backing the public
4//! `Node<Attached>` / `Node<Registered>` typestate chain defined in
5//! `crate::lib`. The struct itself is crate-private; consumers interact with
6//! it indirectly through `Node<S>` → `ActrRef` transitions.
7
8use crate::actr_ref::{ActrRef, ActrRefShared};
9use crate::ais_client::AisClient;
10use crate::context::{BootstrapContextBuilder, RuntimeContext};
11use crate::inbound::{DataStreamRegistry, MediaFrameRegistry};
12use crate::lifecycle::dedup::{DEDUP_TTL, DedupOutcome, DedupState, DedupWaiter};
13use crate::outbound::Gate;
14use crate::transport::HostTransport;
15use crate::wire::webrtc::SignalingClient;
16#[cfg(feature = "opentelemetry")]
17use crate::wire::webrtc::trace::{inject_span_context_to_rpc, set_parent_from_rpc_envelope};
18use actr_framework::Bytes;
19use actr_protocol::prost::Message as ProstMessage;
20use actr_protocol::{
21    AIdCredential, ActorResult, ActrError, ActrId, PayloadType, RegisterAuthMode, RegisterRequest,
22    RpcEnvelope, TurnCredential, register_response,
23};
24use actr_runtime::check_acl_permission;
25use actr_runtime_mailbox::{DeadLetterQueue, Mailbox};
26use std::sync::Arc;
27use std::time::Duration;
28use tokio::sync::{Mutex, RwLock};
29use tokio_util::sync::CancellationToken;
30#[cfg(feature = "opentelemetry")]
31use tracing::Instrument as _;
32
33/// Internal running-state of an attached node.
34///
35/// Holds every field required to run a workload after `Hyper::attach` has
36/// bound a package. Kept private to the crate: external callers use the
37/// public `Node<S>` wrappers in `crate::lib` and the `ActrRef` handle
38/// returned by `Node::start`.
39pub(crate) struct Inner {
40    /// Runtime configuration
41    pub(crate) config: actr_config::RuntimeConfig,
42
43    /// SQLite persistent mailbox
44    pub(crate) mailbox: Arc<dyn Mailbox>,
45
46    /// Dead Letter Queue for poison messages
47    pub(crate) dlq: Arc<dyn DeadLetterQueue>,
48
49    /// In-process gate for `Dest::Shell` / `Dest::Local` calls.
50    ///
51    /// Created in `build()` together with `shell_to_workload` so the inproc
52    /// lane is usable as soon as the node exists, even before registration.
53    pub(crate) inproc_gate: Gate,
54
55    /// Cross-process gate for `Dest::Actor(_)` calls.
56    ///
57    /// `None` until `start()` finishes WebRTC / PeerGate initialization. Any
58    /// outbound call issued before that point returns `Internal("PeerGate
59    /// not initialized yet")` — see `RuntimeContext::select_gate`.
60    pub(crate) outproc_gate: Option<Gate>,
61
62    /// DataStream callback registry shared between the inbound WebRTC / WS
63    /// gates (which dispatch into it) and `RuntimeContext`
64    /// (register_stream / send_data_stream).
65    pub(crate) data_stream_registry: Arc<DataStreamRegistry>,
66
67    /// MediaTrack callback registry shared between WebRTC media tracks and
68    /// `RuntimeContext` (register_media_track / send_media_sample).
69    pub(crate) media_frame_registry: Arc<MediaFrameRegistry>,
70
71    /// Signaling client
72    pub(crate) signaling_client: Arc<dyn SignalingClient>,
73
74    /// Actor ID (obtained after startup)
75    pub(crate) actor_id: Option<ActrId>,
76
77    /// Actor Credential (obtained after startup, used for subsequent authentication messages)
78    pub(crate) credential_state: Option<CredentialState>,
79
80    /// WebRTC coordinator (created after startup)
81    pub(crate) webrtc_coordinator: Option<Arc<crate::wire::webrtc::WebRtcCoordinator>>,
82
83    /// Peer transport manager (created after startup)
84    pub(crate) peer_transport: Option<Arc<crate::transport::PeerTransport>>,
85
86    /// WebRTC Gate (created after startup)
87    pub(crate) webrtc_gate: Option<Arc<crate::wire::webrtc::gate::WebRtcGate>>,
88
89    /// WebSocket Gate (direct-connect mode inbound, optional)
90    pub(crate) websocket_gate: Option<Arc<crate::wire::websocket::WebSocketGate>>,
91
92    /// Shell → Workload transport (REQUEST direction)
93    ///
94    /// Workload receives REQUEST from Shell (zero serialization, direct RpcEnvelope passing)
95    pub(crate) shell_to_workload: Option<Arc<HostTransport>>,
96
97    /// Workload → Shell transport (RESPONSE direction)
98    ///
99    /// Workload sends RESPONSE to Shell (separate pending_requests from Shell's)
100    pub(crate) workload_to_shell: Option<Arc<HostTransport>>,
101
102    /// Shutdown token for graceful shutdown
103    pub(crate) shutdown_token: CancellationToken,
104
105    /// Packaged manifest.lock.toml content loaded at startup for fingerprint lookups.
106    ///
107    /// Wrapped in `Arc` so per-request `RuntimeContext` clones only bump a refcount
108    /// instead of deep-cloning the dependency vector.
109    pub(crate) actr_lock: Option<Arc<actr_config::lock::LockFile>>,
110    /// Network event receiver (from NetworkEventHandle)
111    pub(crate) network_event_rx:
112        Option<tokio::sync::mpsc::Receiver<crate::lifecycle::network_event::NetworkEventRequest>>,
113
114    /// Network event debounce configuration
115    pub(crate) network_event_debounce_config:
116        Option<crate::lifecycle::network_event::DebounceConfig>,
117
118    /// Request deduplication state (15 s TTL response cache, prevents double-processing on retry)
119    pub(crate) dedup_state: Arc<Mutex<DedupState>>,
120
121    /// Verified package manifest for package-backed nodes.
122    #[allow(dead_code)]
123    pub(crate) package_manifest: Option<actr_pack::PackageManifest>,
124
125    /// Pre-issued registration credential injected by the Hyper layer during
126    /// the `Attached → Registered` state transition. `start()` uses it directly
127    /// instead of re-registering with the signaling server.
128    pub(crate) preregistered_credential: Option<actr_protocol::register_response::RegisterOk>,
129
130    /// Shared WebSocket direct-connect address map populated by discovery
131    ///
132    /// Shared with `DefaultWireBuilder` so discovered ws:// URLs can be reused
133    /// directly instead of relying on a static url_template
134    /// The map is keyed by `ActrId`.
135    pub(crate) discovered_ws_addresses:
136        Arc<tokio::sync::RwLock<std::collections::HashMap<ActrId, String>>>,
137
138    /// Runtime workload (WASM, dynclib, etc.)
139    ///
140    /// `handle_incoming` dispatches through this workload.
141    ///
142    /// The `Mutex` serializes dispatch into a single guest actor instance:
143    /// `WasmWorkload::handle` and `DynClibWorkload::handle` both take
144    /// `&mut self` because the underlying Wasmtime `Store` / native guest
145    /// ABI is single-threaded, so concurrent dispatch through the same
146    /// instance would be unsound. Lifecycle hooks also take this lock because
147    /// package-backed WASM / dynclib workloads expose them on the same guest
148    /// instance; transport and other observation hooks reach linked workloads
149    /// through `hook_observer` without holding this lock.
150    pub(crate) workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
151
152    /// Optional shell-side observer that receives linked-workload transport /
153    /// credential / mailbox hook invocations.
154    ///
155    /// `None` means "no observer installed"; the built-in tracing defaults
156    /// still fire from the event-source wiring sites. When `Some`, hook
157    /// invocations are dispatched through `lifecycle::hooks::spawn_hook`
158    /// so panics in observer code cannot unwind into the event source.
159    #[allow(dead_code)]
160    pub(crate) hook_observer: Option<crate::lifecycle::hooks::WorkloadHookObserverRef>,
161
162    /// Queue-length threshold at which the mailbox backpressure
163    /// watchdog fires the framework `on_mailbox_backpressure` hook.
164    ///
165    /// Resolved from [`HyperConfig`] at node construction time so the
166    /// runtime loop does not need to hold a reference back to `HyperConfig`.
167    pub(crate) mailbox_backpressure_threshold: usize,
168
169    /// Lead time before credential expiry at which the framework fires
170    /// the `on_credential_expiring` hook. Resolved from [`HyperConfig`]
171    /// at node construction time.
172    #[allow(dead_code)]
173    pub(crate) credential_expiry_warning: Duration,
174}
175
176/// Credential state for shared access between tasks
177#[derive(Clone)]
178pub struct CredentialState {
179    inner: Arc<RwLock<CredentialStateInner>>,
180}
181
182#[derive(Clone)]
183struct CredentialStateInner {
184    credential: AIdCredential,
185    expires_at: Option<prost_types::Timestamp>,
186    /// HMAC time-limited TURN credential, updated together with credential on registration/renewal
187    turn_credential: Option<TurnCredential>,
188}
189
190impl CredentialState {
191    /// Create a new CredentialState with TURN credential
192    pub fn new(
193        credential: AIdCredential,
194        expires_at: Option<prost_types::Timestamp>,
195        turn_credential: Option<TurnCredential>,
196    ) -> Self {
197        Self {
198            inner: Arc::new(RwLock::new(CredentialStateInner {
199                credential,
200                expires_at,
201                turn_credential,
202            })),
203        }
204    }
205
206    pub async fn credential(&self) -> AIdCredential {
207        self.inner.read().await.credential.clone()
208    }
209
210    pub async fn expires_at(&self) -> Option<prost_types::Timestamp> {
211        self.inner.read().await.expires_at
212    }
213
214    /// Get TURN credential (HMAC time-limited credential)
215    pub async fn turn_credential(&self) -> Option<TurnCredential> {
216        self.inner.read().await.turn_credential.clone()
217    }
218
219    /// Update credential and TURN credential
220    ///
221    /// Called on credential renewal; only overwrites the old TURN credential when the new one is not empty
222    pub(crate) async fn update(
223        &self,
224        credential: AIdCredential,
225        expires_at: Option<prost_types::Timestamp>,
226        turn_credential: Option<TurnCredential>,
227    ) {
228        let mut guard = self.inner.write().await;
229        guard.credential = credential;
230        guard.expires_at = expires_at;
231        if turn_credential.is_some() {
232            guard.turn_credential = turn_credential;
233        }
234    }
235}
236
237/// Host operation executor - routes guest outbound calls through RuntimeContext
238///
239/// Called by the workload dispatch path in `handle_incoming`.
240async fn host_operation_handler(
241    ctx: crate::context::RuntimeContext,
242    workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
243    pending: crate::workload::HostOperation,
244) -> crate::workload::HostOperationResult {
245    use crate::workload::{HostOperation, HostOperationResult, decode_dest};
246    use actr_framework::guest::dynclib_abi::code as abi_code;
247    use actr_framework::{Context as _, Dest};
248    use actr_protocol::{DataStream, PayloadType};
249
250    /// Map `ActrError` to ABI error code, preserving semantics for guest-side discrimination
251    fn actr_error_to_code(err: &ActrError) -> i32 {
252        match err {
253            ActrError::DecodeFailure(_) | ActrError::InvalidArgument(_) => abi_code::PROTOCOL_ERROR,
254            _ => abi_code::GENERIC_ERROR,
255        }
256    }
257
258    match pending {
259        HostOperation::CallRaw(req) => {
260            match ctx
261                .call_raw(
262                    &Dest::Actor(req.target),
263                    req.route_key,
264                    PayloadType::RpcReliable,
265                    bytes::Bytes::from(req.payload),
266                    30_000,
267                )
268                .await
269            {
270                Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
271                Err(e) => {
272                    tracing::error!("call_raw routing failed: {e:?}");
273                    HostOperationResult::Error(actr_error_to_code(&e))
274                }
275            }
276        }
277
278        HostOperation::Call(req) => {
279            let dest = match decode_dest(&req.dest) {
280                Some(d) => d,
281                None => {
282                    tracing::error!(route_key = req.route_key, "call: dest decode failed");
283                    return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
284                }
285            };
286            match ctx
287                .call_raw(
288                    &dest,
289                    req.route_key,
290                    PayloadType::RpcReliable,
291                    bytes::Bytes::from(req.payload),
292                    30_000,
293                )
294                .await
295            {
296                Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
297                Err(e) => {
298                    tracing::error!("call routing failed: {e:?}");
299                    HostOperationResult::Error(actr_error_to_code(&e))
300                }
301            }
302        }
303
304        HostOperation::Tell(req) => {
305            let dest = match decode_dest(&req.dest) {
306                Some(d) => d,
307                None => {
308                    tracing::error!(route_key = req.route_key, "tell: dest decode failed");
309                    return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
310                }
311            };
312            match ctx
313                .tell_raw(
314                    &dest,
315                    req.route_key,
316                    PayloadType::RpcReliable,
317                    bytes::Bytes::from(req.payload),
318                )
319                .await
320            {
321                Ok(()) => HostOperationResult::Done,
322                Err(e) => {
323                    tracing::error!("tell routing failed: {e:?}");
324                    HostOperationResult::Error(actr_error_to_code(&e))
325                }
326            }
327        }
328
329        HostOperation::Discover(req) => {
330            match ctx.discover_route_candidate(&req.target_type).await {
331                Ok(id) => HostOperationResult::Bytes(id.encode_to_vec()),
332                Err(e) => {
333                    tracing::error!("discover failed: {e:?}");
334                    HostOperationResult::Error(actr_error_to_code(&e))
335                }
336            }
337        }
338
339        HostOperation::RegisterStream(req) => {
340            let stream_id = req.stream_id;
341            let callback_ctx = ctx.clone();
342            let callback_workload_dispatch = workload_dispatch.clone();
343            match ctx
344                .register_stream(stream_id, move |chunk: DataStream, sender| {
345                    let ctx_for_executor = callback_ctx.clone();
346                    let workload_dispatch = callback_workload_dispatch.clone();
347                    Box::pin(async move {
348                        let invocation = crate::workload::InvocationContext {
349                            self_id: actr_framework::Context::self_id(&ctx_for_executor).clone(),
350                            caller_id: Some(sender.clone()),
351                            request_id: format!(
352                                "data-stream:{}:{}",
353                                chunk.stream_id, chunk.sequence
354                            ),
355                        };
356                        let call_executor: crate::workload::HostAbiFn =
357                            std::sync::Arc::new(move |pending| {
358                                let ctx = ctx_for_executor.clone();
359                                Box::pin(async move {
360                                    stream_callback_host_operation_handler(ctx, pending).await
361                                })
362                            });
363                        let mut guard = workload_dispatch.lock().await;
364                        guard
365                            .dispatch_data_stream(chunk, sender, invocation, &call_executor)
366                            .await
367                    })
368                })
369                .await
370            {
371                Ok(()) => HostOperationResult::Done,
372                Err(e) => {
373                    tracing::error!("register_stream failed: {e:?}");
374                    HostOperationResult::Error(actr_error_to_code(&e))
375                }
376            }
377        }
378
379        HostOperation::UnregisterStream(req) => match ctx.unregister_stream(&req.stream_id).await {
380            Ok(()) => HostOperationResult::Done,
381            Err(e) => {
382                tracing::error!("unregister_stream failed: {e:?}");
383                HostOperationResult::Error(actr_error_to_code(&e))
384            }
385        },
386
387        HostOperation::SendDataStream(req) => {
388            let dest = match decode_dest(&req.dest) {
389                Some(d) => d,
390                None => {
391                    tracing::error!("send_data_stream: dest decode failed");
392                    return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
393                }
394            };
395            let payload_type = match PayloadType::try_from(req.payload_type) {
396                Ok(PayloadType::StreamReliable | PayloadType::StreamLatencyFirst) => {
397                    PayloadType::try_from(req.payload_type).expect("checked payload type")
398                }
399                Ok(other) => {
400                    tracing::error!(?other, "send_data_stream: invalid stream payload type");
401                    return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
402                }
403                Err(_) => {
404                    tracing::error!(
405                        payload_type = req.payload_type,
406                        "send_data_stream: unknown payload type"
407                    );
408                    return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
409                }
410            };
411            match ctx.send_data_stream(&dest, req.chunk, payload_type).await {
412                Ok(()) => HostOperationResult::Done,
413                Err(e) => {
414                    tracing::error!("send_data_stream failed: {e:?}");
415                    HostOperationResult::Error(actr_error_to_code(&e))
416                }
417            }
418        }
419    }
420}
421
422fn lifecycle_invocation(
423    actor_id: &ActrId,
424    request_id: &'static str,
425) -> crate::workload::InvocationContext {
426    crate::workload::InvocationContext {
427        self_id: actor_id.clone(),
428        caller_id: None,
429        request_id: request_id.to_string(),
430    }
431}
432
433pub(crate) fn lifecycle_host_abi(
434    ctx: crate::context::RuntimeContext,
435    workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
436) -> crate::workload::HostAbiFn {
437    std::sync::Arc::new(move |pending| {
438        let ctx = ctx.clone();
439        let workload_dispatch = workload_dispatch.clone();
440        Box::pin(async move { host_operation_handler(ctx, workload_dispatch, pending).await })
441    })
442}
443
444async fn stream_callback_host_operation_handler(
445    ctx: crate::context::RuntimeContext,
446    pending: crate::workload::HostOperation,
447) -> crate::workload::HostOperationResult {
448    use crate::workload::{HostOperation, HostOperationResult, decode_dest};
449    use actr_framework::guest::dynclib_abi::code as abi_code;
450    use actr_framework::{Context as _, Dest};
451    use actr_protocol::PayloadType;
452
453    fn actr_error_to_code(err: &ActrError) -> i32 {
454        match err {
455            ActrError::DecodeFailure(_) | ActrError::InvalidArgument(_) => abi_code::PROTOCOL_ERROR,
456            _ => abi_code::GENERIC_ERROR,
457        }
458    }
459
460    match pending {
461        HostOperation::CallRaw(req) => {
462            match ctx
463                .call_raw(
464                    &Dest::Actor(req.target),
465                    req.route_key,
466                    PayloadType::RpcReliable,
467                    bytes::Bytes::from(req.payload),
468                    30_000,
469                )
470                .await
471            {
472                Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
473                Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
474            }
475        }
476        HostOperation::Call(req) => {
477            let dest = match decode_dest(&req.dest) {
478                Some(d) => d,
479                None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
480            };
481            match ctx
482                .call_raw(
483                    &dest,
484                    req.route_key,
485                    PayloadType::RpcReliable,
486                    bytes::Bytes::from(req.payload),
487                    30_000,
488                )
489                .await
490            {
491                Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
492                Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
493            }
494        }
495        HostOperation::Tell(req) => {
496            let dest = match decode_dest(&req.dest) {
497                Some(d) => d,
498                None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
499            };
500            match ctx
501                .tell_raw(
502                    &dest,
503                    req.route_key,
504                    PayloadType::RpcReliable,
505                    bytes::Bytes::from(req.payload),
506                )
507                .await
508            {
509                Ok(()) => HostOperationResult::Done,
510                Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
511            }
512        }
513        HostOperation::Discover(req) => {
514            match ctx.discover_route_candidate(&req.target_type).await {
515                Ok(id) => HostOperationResult::Bytes(id.encode_to_vec()),
516                Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
517            }
518        }
519        HostOperation::RegisterStream(_) => {
520            tracing::error!("register_stream from inside a stream callback is not supported");
521            HostOperationResult::Error(abi_code::UNSUPPORTED_OP)
522        }
523        HostOperation::UnregisterStream(req) => match ctx.unregister_stream(&req.stream_id).await {
524            Ok(()) => HostOperationResult::Done,
525            Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
526        },
527        HostOperation::SendDataStream(req) => {
528            let dest = match decode_dest(&req.dest) {
529                Some(d) => d,
530                None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
531            };
532            let payload_type = match PayloadType::try_from(req.payload_type) {
533                Ok(PayloadType::StreamReliable | PayloadType::StreamLatencyFirst) => {
534                    PayloadType::try_from(req.payload_type).expect("checked payload type")
535                }
536                Ok(_) | Err(_) => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
537            };
538            match ctx.send_data_stream(&dest, req.chunk, payload_type).await {
539                Ok(()) => HostOperationResult::Done,
540                Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
541            }
542        }
543    }
544}
545
546/// Map ActrError to error code for ErrorResponse
547fn protocol_error_to_code(err: &ActrError) -> u32 {
548    match err {
549        ActrError::Unavailable(_) => 503,            // Service Unavailable
550        ActrError::ConnectionNotReady(_) => 503,     // Service Unavailable (send preflight)
551        ActrError::TimedOut => 504,                  // Gateway Timeout
552        ActrError::NotFound(_) => 404,               // Not Found
553        ActrError::PermissionDenied(_) => 403,       // Forbidden
554        ActrError::InvalidArgument(_) => 400,        // Bad Request
555        ActrError::UnknownRoute(_) => 404,           // Not Found - route not found
556        ActrError::DependencyNotFound { .. } => 400, // Bad Request
557        ActrError::DecodeFailure(_) => 400,          // Bad Request - decode failure
558        ActrError::NotImplemented(_) => 501,         // Not Implemented
559        ActrError::Internal(_) => 500,               // Internal Server Error
560    }
561}
562
563impl Inner {
564    #[allow(dead_code)]
565    pub(crate) fn package_manifest(&self) -> Option<&actr_pack::PackageManifest> {
566        self.package_manifest.as_ref()
567    }
568
569    /// Network event processing loop (background task)
570    ///
571    /// # Responsibilities
572    /// - Receive network events from Channel
573    /// - Delegate to NetworkEventProcessor for handling
574    /// - Record processing time and send results
575    async fn network_event_loop(
576        event_rx: tokio::sync::mpsc::Receiver<crate::lifecycle::network_event::NetworkEventRequest>,
577        event_processor: Arc<dyn crate::lifecycle::network_event::NetworkEventProcessor>,
578        shutdown_token: CancellationToken,
579    ) {
580        crate::lifecycle::network_event::run_network_event_reconciler(
581            event_rx,
582            event_processor,
583            shutdown_token,
584        )
585        .await;
586    }
587
588    fn duplicate_wait_timeout(timeout_ms: i64) -> Duration {
589        if timeout_ms > 0 {
590            Duration::from_millis(timeout_ms as u64)
591        } else {
592            DEDUP_TTL
593        }
594    }
595
596    async fn wait_for_inflight_duplicate(
597        mut waiter: DedupWaiter,
598        timeout: Duration,
599    ) -> ActorResult<Bytes> {
600        let wait_for_result = async {
601            loop {
602                if let Some(result) = waiter.borrow().clone() {
603                    return result;
604                }
605
606                if waiter.changed().await.is_err() {
607                    if let Some(result) = waiter.borrow().clone() {
608                        return result;
609                    }
610                    return Err(ActrError::Unavailable(
611                        "duplicate request result unavailable".to_string(),
612                    ));
613                }
614            }
615        };
616
617        match tokio::time::timeout(timeout, wait_for_result).await {
618            Ok(result) => result,
619            Err(_) => Err(ActrError::Unavailable(format!(
620                "duplicate request in-flight timed out after {}ms",
621                timeout.as_millis()
622            ))),
623        }
624    }
625
626    /// - Single-hop calls: effectively identical
627    /// - Multi-hop calls: trace_id spans all hops, request_id per hop
628    #[cfg_attr(
629        feature = "opentelemetry",
630        tracing::instrument(
631            skip_all,
632            name = "ActrNode.handle_incoming",
633            fields(
634                actr_id = %self.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default(),
635                route_key = %envelope.route_key,
636                request_id = %envelope.request_id,
637            )
638        )
639    )]
640    pub async fn handle_incoming(
641        &self,
642        envelope: RpcEnvelope,
643        caller_id: Option<&ActrId>,
644    ) -> ActorResult<Bytes> {
645        // Log received message
646        if let Some(caller) = caller_id {
647            tracing::debug!(
648                "📨 Handling incoming message: route_key={}, caller={}, request_id={}",
649                envelope.route_key,
650                caller,
651                envelope.request_id
652            );
653        } else {
654            tracing::debug!(
655                "📨 Handling incoming message: route_key={}, request_id={}",
656                envelope.route_key,
657                envelope.request_id
658            );
659        }
660
661        // 0. Get actor_id early for ACL check
662        let actor_id = self.actor_id.as_ref().ok_or_else(|| {
663            ActrError::Internal(
664                "Actor ID not set - node must be started before handling messages".to_string(),
665            )
666        })?;
667
668        // 0.1. ACL Permission Check (before processing message)
669        let acl_allowed = check_acl_permission(caller_id, actor_id, self.config.acl.as_ref())
670            .map_err(|err_msg| ActrError::Internal(format!("ACL check failed: {}", err_msg)))?;
671
672        if !acl_allowed {
673            tracing::warn!(
674                severity = 5,
675                error_category = "acl_denied",
676                request_id = %envelope.request_id,
677                route_key = %envelope.route_key,
678                caller = %caller_id
679                    .map(|c| c.to_string())
680                    .unwrap_or_else(|| "<none>".to_string()),
681                "🚫 ACL: Permission denied"
682            );
683
684            return Err(ActrError::PermissionDenied(format!(
685                "ACL denied: {} is not allowed to call {}",
686                caller_id
687                    .map(|c| c.to_string())
688                    .unwrap_or_else(|| "<unknown>".to_string()),
689                actor_id
690            )));
691        }
692
693        // 0.2. Deduplication: return cached response for retried request_ids
694        let outcome = {
695            self.dedup_state
696                .lock()
697                .await
698                .check_or_mark(&envelope.request_id)
699        };
700        match outcome {
701            DedupOutcome::Fresh => {} // proceed normally
702            DedupOutcome::InFlight(waiter) => {
703                tracing::debug!(
704                    request_id = %envelope.request_id,
705                    route_key = %envelope.route_key,
706                    "duplicate request in-flight; waiting for original result"
707                );
708                return Self::wait_for_inflight_duplicate(
709                    waiter,
710                    Self::duplicate_wait_timeout(envelope.timeout_ms),
711                )
712                .await;
713            }
714            DedupOutcome::Duplicate(cached) => {
715                tracing::debug!(
716                    request_id = %envelope.request_id,
717                    route_key = %envelope.route_key,
718                    "♻️ returning cached response for duplicate request_id"
719                );
720                return cached;
721            }
722        }
723
724        // 1. Create Context with caller_id from transport layer
725        let credential_state = self.credential_state.clone().ok_or_else(|| {
726            ActrError::Internal(
727                "Credential not set - node must be started before handling messages".to_string(),
728            )
729        })?;
730        let ctx = self.make_runtime_context(
731            actor_id,
732            caller_id, // caller_id from transport layer (MessageRecord.from)
733            &envelope.request_id,
734            &credential_state.credential().await,
735        );
736
737        // 2. Dispatch
738        let dispatch_ctx = crate::workload::InvocationContext {
739            self_id: actor_id.clone(),
740            caller_id: caller_id.cloned(),
741            request_id: envelope.request_id.clone(),
742        };
743        let ctx_for_executor = ctx.clone();
744        let workload_for_executor = self.workload_dispatch.clone();
745        let call_executor: crate::workload::HostAbiFn = std::sync::Arc::new(move |pending| {
746            let ctx = ctx_for_executor.clone();
747            let workload_dispatch = workload_for_executor.clone();
748            Box::pin(async move { host_operation_handler(ctx, workload_dispatch, pending).await })
749        });
750
751        let mut guard = self.workload_dispatch.lock().await;
752        let result = guard
753            .dispatch_envelope(envelope.clone(), ctx.clone(), dispatch_ctx, &call_executor)
754            .await
755            .map_err(|e| ActrError::Internal(format!("workload dispatch failed: {e:?}")));
756
757        match &result {
758            Ok(_) => tracing::debug!(
759                request_id = %envelope.request_id,
760                route_key = %envelope.route_key,
761                "✅ Message handled successfully"
762            ),
763            Err(e) => tracing::error!(
764                severity = 6,
765                error_category = "handler_error",
766                request_id = %envelope.request_id,
767                route_key = %envelope.route_key,
768                "❌ Message handling failed: {:?}", e
769            ),
770        }
771
772        // 3. Store completed result in dedup cache before returning
773        self.dedup_state
774            .lock()
775            .await
776            .complete(&envelope.request_id, result.clone());
777
778        result
779    }
780
781    /// Build a new `Inner` from config and runtime workload.
782    ///
783    /// This is the internal constructor behind the public node builders and
784    /// Hyper package attach helpers.
785    pub(crate) async fn build(
786        config: actr_config::RuntimeConfig,
787        workload: crate::workload::Workload,
788        package_manifest: Option<actr_pack::PackageManifest>,
789        packaged_lock: Option<actr_config::lock::LockFile>,
790        mailbox_backpressure_threshold: usize,
791        credential_expiry_warning: Duration,
792    ) -> ActorResult<Self> {
793        use crate::outbound::{Gate, HostGate};
794        use crate::wire::webrtc::{ReconnectConfig, SignalingConfig, WebSocketSignalingClient};
795
796        tracing::info!("🚀 Initializing ActrNode");
797
798        // Initialize Mailbox
799        let mailbox_path = config
800            .mailbox_path
801            .as_ref()
802            .map(|p| p.to_string_lossy().to_string())
803            .unwrap_or_else(|| ":memory:".to_string());
804
805        tracing::info!("📂 Mailbox database path: {}", mailbox_path);
806
807        let mailbox: Arc<dyn actr_runtime_mailbox::Mailbox> = Arc::new(
808            actr_runtime_mailbox::SqliteMailbox::new(&mailbox_path)
809                .await
810                .map_err(|e| {
811                    actr_protocol::ActrError::Unavailable(format!("Mailbox init failed: {e}"))
812                })?,
813        );
814
815        // Initialize Dead Letter Queue
816        let dlq_path = if mailbox_path == ":memory:" {
817            ":memory:".to_string()
818        } else {
819            format!("{mailbox_path}.dlq")
820        };
821
822        let dlq: Arc<dyn actr_runtime_mailbox::DeadLetterQueue> = Arc::new(
823            actr_runtime_mailbox::SqliteDeadLetterQueue::new_standalone(&dlq_path)
824                .await
825                .map_err(|e| {
826                    actr_protocol::ActrError::Unavailable(format!("DLQ init failed: {e}"))
827                })?,
828        );
829        tracing::info!("✅ Dead Letter Queue initialized");
830
831        // Initialize signaling client
832        let webrtc_role = if config.webrtc.advanced.prefer_answerer() {
833            Some("answer".to_string())
834        } else {
835            None
836        };
837
838        let signaling_config = SignalingConfig {
839            server_url: config.signaling_url.clone(),
840            connection_timeout: 30,
841            heartbeat_interval: 30,
842            reconnect_config: ReconnectConfig::default(),
843            auth_config: None,
844            webrtc_role,
845        };
846
847        let client = Arc::new(WebSocketSignalingClient::new(signaling_config));
848        client.start_reconnect_manager();
849        let signaling_client: Arc<dyn crate::wire::webrtc::SignalingClient> = client;
850
851        // Initialize inproc infrastructure (Shell ↔ Guest)
852        let shell_to_workload = Arc::new(HostTransport::new());
853        let workload_to_shell = Arc::new(HostTransport::new());
854        let inproc_gate = Gate::Host(Arc::new(HostGate::new(shell_to_workload.clone())));
855
856        let data_stream_registry = Arc::new(DataStreamRegistry::new());
857        let media_frame_registry = Arc::new(MediaFrameRegistry::new());
858
859        tracing::info!("✅ Inproc infrastructure initialized (bidirectional Shell ↔ Guest)");
860
861        let actr_lock = if let Some(lock) = packaged_lock {
862            tracing::info!(
863                "📋 Loaded packaged manifest.lock.toml with {} dependencies",
864                lock.dependencies.len()
865            );
866            Some(Arc::new(lock))
867        } else {
868            tracing::warn!(
869                "⚠️ manifest.lock.toml not found in package. Continuing without dependency fingerprints."
870            );
871            None
872        };
873
874        tracing::info!("✅ ActrNode initialized");
875
876        Ok(Self {
877            config,
878            mailbox,
879            dlq,
880            inproc_gate,
881            outproc_gate: None, // Populated in start() once WebRTC / PeerGate is ready.
882            data_stream_registry,
883            media_frame_registry,
884            signaling_client,
885            actor_id: None,
886            credential_state: None,
887            webrtc_coordinator: None,
888            peer_transport: None,
889            webrtc_gate: None,
890            websocket_gate: None,
891            shell_to_workload: Some(shell_to_workload),
892            workload_to_shell: Some(workload_to_shell),
893            shutdown_token: CancellationToken::new(),
894            actr_lock,
895            network_event_rx: None,
896            network_event_debounce_config: None,
897            dedup_state: Arc::new(Mutex::new(DedupState::new())),
898            package_manifest,
899            preregistered_credential: None,
900            discovered_ws_addresses: Arc::new(tokio::sync::RwLock::new(
901                std::collections::HashMap::new(),
902            )),
903            workload_dispatch: Arc::new(Mutex::new(workload)),
904            hook_observer: None,
905            mailbox_backpressure_threshold,
906            credential_expiry_warning,
907        })
908    }
909
910    /// Snapshot the current runtime handles into a `BootstrapContextBuilder`.
911    ///
912    /// The returned builder is cloned into long-lived hook closures and into
913    /// `ActrRefShared` so those paths can materialize bootstrap contexts
914    /// without retaining a reference back to `Inner`. The snapshot freezes
915    /// `outproc_gate` and `actr_lock` at call time — callers that want to
916    /// observe a later-initialized `outproc_gate` must rebuild.
917    pub(crate) fn bootstrap_ctx_builder(&self) -> BootstrapContextBuilder {
918        BootstrapContextBuilder::new(
919            self.inproc_gate.clone(),
920            self.outproc_gate.clone(),
921            self.data_stream_registry.clone(),
922            self.media_frame_registry.clone(),
923            self.signaling_client.clone(),
924            self.actr_lock.clone(),
925        )
926    }
927
928    /// Build a `RuntimeContext` for the per-request dispatch path.
929    ///
930    /// Unlike `BootstrapContextBuilder::build_bootstrap`, this carries the
931    /// envelope's caller identity and request id through into the context.
932    pub(crate) fn make_runtime_context(
933        &self,
934        self_id: &ActrId,
935        caller_id: Option<&ActrId>,
936        request_id: &str,
937        credential: &AIdCredential,
938    ) -> RuntimeContext {
939        RuntimeContext::new(
940            self_id.clone(),
941            caller_id.cloned(),
942            request_id.to_string(),
943            self.inproc_gate.clone(),
944            self.outproc_gate.clone(),
945            self.data_stream_registry.clone(),
946            self.media_frame_registry.clone(),
947            self.signaling_client.clone(),
948            credential.clone(),
949            self.actr_lock.clone(),
950        )
951    }
952
953    /// Create network event processing infrastructure (called on demand, before `start()`).
954    ///
955    /// # Parameters
956    /// - `debounce_ms`: Debounce window in milliseconds. If 0, no debounce.
957    ///
958    /// # Panics
959    /// Panics if called more than once.
960    pub fn create_network_event_handle(
961        &mut self,
962        debounce_ms: u64,
963    ) -> crate::lifecycle::NetworkEventHandle {
964        if self.network_event_rx.is_some() {
965            panic!("create_network_event_handle() can only be called once");
966        }
967
968        let (event_tx, event_rx) = tokio::sync::mpsc::channel(100);
969
970        let debounce_config = if debounce_ms > 0 {
971            Some(crate::lifecycle::network_event::DebounceConfig {
972                window: std::time::Duration::from_millis(debounce_ms),
973            })
974        } else {
975            None
976        };
977
978        self.network_event_rx = Some(event_rx);
979        self.network_event_debounce_config = debounce_config;
980
981        tracing::info!(
982            debounce_ms,
983            channel_capacity = 100_u64,
984            "network_event.node.handle_created"
985        );
986
987        crate::lifecycle::NetworkEventHandle::new(event_tx)
988    }
989
990    /// Attach a credential already issued by AIS so that `start()` can skip
991    /// the signaling registration step.
992    ///
993    /// Called by the Hyper layer between `Hyper::register()` and `Hyper::start()`.
994    pub fn set_preregistered_credential(&mut self, register_ok: register_response::RegisterOk) {
995        tracing::debug!("Pre-registered credential attached; start() will skip AIS registration");
996        self.preregistered_credential = Some(register_ok);
997    }
998
999    /// Start the system
1000    pub async fn start(mut self) -> ActorResult<ActrRef> {
1001        tracing::info!("🚀 Starting ActrNode");
1002        tracing::info!("Actr Rust version: {}", env!("CARGO_PKG_VERSION"));
1003
1004        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1005        // 1. Build RegisterRequest
1006        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1007        // Get ActrType from configuration
1008        let actr_type = self.config.actr_type().clone();
1009        tracing::info!("📋 Actor type: {}", actr_type);
1010
1011        // ServiceSpec is derived by the Hyper layer from the verified package
1012        // (see `service_spec::calculate_service_spec_from_package`). The raw
1013        // ActrNode::start() path has no package context and always sends None
1014        // on its own RegisterRequest; callers that need a spec must go
1015        // through `Hyper::register()`.
1016        let service_spec = None;
1017
1018        // If a WebSocket listen port is configured, build the advertised ws:// address
1019        // to register with the signaling server so clients can discover it.
1020        let ws_address = if let Some(port) = self.config.websocket_listen_port {
1021            let host = self
1022                .config
1023                .websocket_advertised_host
1024                .as_deref()
1025                .unwrap_or("127.0.0.1");
1026            Some(format!("ws://{}:{}", host, port))
1027        } else {
1028            None
1029        };
1030
1031        if let Some(ref addr) = ws_address {
1032            tracing::info!(
1033                "📡 Advertising WebSocket address to signaling server: {}",
1034                addr
1035            );
1036        }
1037
1038        let register_request = RegisterRequest {
1039            actr_type: actr_type.clone(),
1040            realm: self.config.realm,
1041            service_spec,
1042            acl: self.config.acl.clone(),
1043            service: None,
1044            ws_address,
1045            auth_mode: Some(RegisterAuthMode::Linked as i32),
1046            ..Default::default()
1047        };
1048
1049        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1050        // 1. Obtain registration info (Hyper pre-injected or AIS HTTP)
1051        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1052        let register_ok = if let Some(injected) = self.preregistered_credential.take() {
1053            tracing::info!(
1054                "Using Hyper pre-injected registration credential; skipping AIS registration"
1055            );
1056            injected
1057        } else {
1058            let ais_endpoint = &self.config.ais_endpoint;
1059            tracing::info!(
1060                ais_endpoint = %ais_endpoint,
1061                "Registering actor with AIS via HTTP"
1062            );
1063            let mut ais = AisClient::new(ais_endpoint);
1064            if let Some(ref secret) = self.config.realm_secret {
1065                ais = ais.with_realm_secret(secret);
1066            }
1067            let resp = ais
1068                .register_linked(register_request.clone())
1069                .await
1070                .map_err(|e| ActrError::Unavailable(format!("AIS registration failed: {e}")))?;
1071            match resp.result {
1072                Some(register_response::Result::Success(ok)) => {
1073                    tracing::info!("✅ AIS HTTP registration successful");
1074                    ok
1075                }
1076                Some(register_response::Result::Error(error)) => {
1077                    tracing::error!(
1078                        severity = 10,
1079                        error_category = "registration_error",
1080                        error_code = error.code,
1081                        "❌ AIS registration failed: code={}, message={}",
1082                        error.code,
1083                        error.message
1084                    );
1085                    return Err(ActrError::Unavailable(format!(
1086                        "AIS registration rejected: {} (code: {})",
1087                        error.message, error.code
1088                    )));
1089                }
1090                None => {
1091                    tracing::error!(
1092                        severity = 10,
1093                        error_category = "registration_error",
1094                        "❌ AIS registration response missing result"
1095                    );
1096                    return Err(ActrError::Unavailable(
1097                        "Invalid AIS registration response: missing result".to_string(),
1098                    ));
1099                }
1100            }
1101        };
1102
1103        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1104        // 3. Set credential on signaling client, then connect signaling WS
1105        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1106        // The signaling server requires credential params in the WS URL for
1107        // authentication. We must set actor_id + credential BEFORE connecting
1108        // so that build_url_with_identity() includes them in the query string.
1109        let pre_connect_credential_state = {
1110            let actor_id = register_ok.actr_id.clone();
1111            let credential_state = CredentialState::new(
1112                register_ok.credential.clone(),
1113                register_ok.credential_expires_at,
1114                Some(register_ok.turn_credential.clone()),
1115            );
1116            self.signaling_client.set_actor_id(actor_id).await;
1117            self.signaling_client
1118                .set_credential_state(credential_state.clone())
1119                .await;
1120            credential_state
1121        };
1122
1123        // Install the signaling-side hook callback so that
1124        // SignalingConnectStart / Connected / Disconnected events flow
1125        // through the framework tracing defaults and into a
1126        // user-installed observer. Done BEFORE connect() so the initial
1127        // attempt produces a SignalingConnectStart event.
1128        {
1129            let actor_id = register_ok.actr_id.clone();
1130            let credential_state = pre_connect_credential_state.clone();
1131            // Snapshot at this point — outproc_gate is still None here, so
1132            // signaling-event contexts will carry None for outproc_gate
1133            // (matching the pre-existing behavior prior to B13 refactor).
1134            let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1135            let ctx_builder: crate::lifecycle::hooks::HookContextBuilder = Arc::new(move || {
1136                let snapshot = ctx_builder_snapshot.clone();
1137                let actor_id = actor_id.clone();
1138                let credential_state = credential_state.clone();
1139                Box::pin(async move {
1140                    Some(snapshot.build_bootstrap(&actor_id, &credential_state.credential().await))
1141                })
1142            });
1143            let cb = crate::lifecycle::hooks::build_hook_callback(
1144                self.hook_observer.clone(),
1145                ctx_builder,
1146            );
1147            self.signaling_client.set_hook_callback(cb);
1148        }
1149
1150        tracing::info!("📡 Connecting to signaling server (with credential)");
1151        self.signaling_client
1152            .connect()
1153            .await
1154            .map_err(|e| ActrError::Unavailable(format!("Signaling connect failed: {e}")))?;
1155        tracing::info!("✅ Connected to signaling server");
1156
1157        // Collect background task handles so they can be managed by ActrRefShared later.
1158        let mut task_handles = Vec::new();
1159
1160        // Node-level hook callback, built inside the registration
1161        // setup block below and published back out into this wider
1162        // scope so the mailbox backpressure watchdog can subscribe.
1163        let node_hook_callback: Option<crate::wire::webrtc::HookCallback>;
1164
1165        {
1166            let actor_id = register_ok.actr_id;
1167            let credential = register_ok.credential;
1168
1169            tracing::info!("🆔 Assigned ActrId: {}", actor_id);
1170            tracing::info!("🔐 Received credential (key_id: {})", credential.key_id);
1171            tracing::info!(
1172                "💓 Signaling heartbeat interval: {} seconds",
1173                register_ok.signaling_heartbeat_interval_secs
1174            );
1175
1176            // TurnCredential is a required field; should always be present under normal registration.
1177            tracing::debug!("TurnCredential received, TURN authentication ready");
1178
1179            if let Some(expires_at) = &register_ok.credential_expires_at {
1180                tracing::debug!("⏰ Credential expires at: {}s", expires_at.seconds);
1181            }
1182
1183            // Store ActrId and credential state
1184            self.actor_id = Some(actor_id.clone());
1185            let credential_state = CredentialState::new(
1186                credential,
1187                register_ok.credential_expires_at,
1188                Some(register_ok.turn_credential.clone()),
1189            );
1190            self.credential_state = Some(credential_state.clone());
1191
1192            // Build the node-level lifecycle hook callback once: it is
1193            // reused for the initial `on_credential_renewed`, handed to
1194            // the heartbeat task for subsequent credential events, and
1195            // handed to the mailbox backpressure watchdog for
1196            // `on_mailbox_backpressure` on rising-edge crossings.
1197            //
1198            // The signaling layer already has its own callback installed
1199            // above — this second callback only carries credential and
1200            // mailbox-backpressure events, so no overlap with the
1201            // signaling-event plumbing.
1202            node_hook_callback =
1203                {
1204                    let actor_id_for_hook = actor_id.clone();
1205                    let credential_state_for_hook = credential_state.clone();
1206                    // Snapshot at this point — outproc_gate is still None
1207                    // here; credential / mailbox hook contexts inherit that
1208                    // and therefore cannot issue Dest::Actor(_) calls (same
1209                    // behavior as before B13 refactor).
1210                    let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1211                    let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1212                        Arc::new(move || {
1213                            let snapshot = ctx_builder_snapshot.clone();
1214                            let actor_id = actor_id_for_hook.clone();
1215                            let credential_state = credential_state_for_hook.clone();
1216                            Box::pin(async move {
1217                                Some(snapshot.build_bootstrap(
1218                                    &actor_id,
1219                                    &credential_state.credential().await,
1220                                ))
1221                            })
1222                        });
1223                    Some(crate::lifecycle::hooks::build_hook_callback(
1224                        self.hook_observer.clone(),
1225                        ctx_builder,
1226                    ))
1227                };
1228
1229            // Fire `on_credential_renewed` at initial registration: the
1230            // credential is considered "renewed" from "nothing" to the
1231            // value just issued by AIS. Subsequent renewals fire the
1232            // same hook from `lifecycle::heartbeat`.
1233            if let Some(expires_at) = &register_ok.credential_expires_at {
1234                let new_expiry = std::time::UNIX_EPOCH
1235                    + std::time::Duration::from_secs(expires_at.seconds.max(0) as u64);
1236                if let Some(cb) = node_hook_callback.as_ref() {
1237                    cb(crate::wire::webrtc::HookEvent::CredentialRenewed { new_expiry }).await;
1238                } else {
1239                    tracing::info!(new_expiry = ?new_expiry, "credential renewed");
1240                }
1241            }
1242
1243            // Note: actor_id and credential_state were already set on signaling_client
1244            // before connect (step 3 above), so reconnect URLs already carry correct auth.
1245
1246            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1247            // 1.3. Inproc transports were filled in during `build()`; nothing
1248            //      to stage here now that ContextFactory has been removed.
1249            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1250            tracing::info!("✅ Inproc infrastructure already ready (created in ActrNode::build())");
1251
1252            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1253            // 1.5. Create WebRTC infrastructure
1254            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1255            tracing::info!("🌐 Initializing WebRTC infrastructure");
1256
1257            let media_frame_registry = self.media_frame_registry.clone();
1258
1259            // Create WebRtcCoordinator
1260            let coordinator = Arc::new(crate::wire::webrtc::WebRtcCoordinator::new(
1261                actor_id.clone(),
1262                credential_state.clone(),
1263                self.signaling_client.clone(),
1264                self.config.webrtc.clone(),
1265                media_frame_registry,
1266            ));
1267
1268            // Install the WebRTC hook callback — fires
1269            // WebRtcConnectStart / Connected (with relayed info) /
1270            // Disconnected HookEvents on every peer state change.
1271            {
1272                let actor_id_for_hook = actor_id.clone();
1273                let credential_state_for_hook = credential_state.clone();
1274                // Snapshot before outproc_gate is wired up (just below). This
1275                // preserves the pre-refactor behavior where WebRTC-event
1276                // hook contexts carry outproc_gate = None.
1277                let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1278                let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1279                    Arc::new(move || {
1280                        let snapshot = ctx_builder_snapshot.clone();
1281                        let actor_id = actor_id_for_hook.clone();
1282                        let credential_state = credential_state_for_hook.clone();
1283                        Box::pin(async move {
1284                            Some(
1285                                snapshot.build_bootstrap(
1286                                    &actor_id,
1287                                    &credential_state.credential().await,
1288                                ),
1289                            )
1290                        })
1291                    });
1292                let cb = crate::lifecycle::hooks::build_hook_callback(
1293                    self.hook_observer.clone(),
1294                    ctx_builder,
1295                );
1296                coordinator.set_hook_callback(cb);
1297            }
1298
1299            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1300            // 1.6. Create PeerTransport + PeerGate (new architecture)
1301            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1302            tracing::info!("🏗️  Creating PeerTransport with WebRTC support");
1303
1304            // Create DefaultWireBuilder with WebRTC coordinator
1305            use crate::transport::{DefaultWireBuilder, DefaultWireBuilderConfig};
1306
1307            // WebSocket channel always enabled: target ws:// address is fully discovered at runtime
1308            // Direct-connect mode: encode local node ActrId as hex, sent as X-Actr-Node-Id
1309            let local_id_hex = hex::encode(actor_id.encode_to_vec());
1310            let wire_builder_config = DefaultWireBuilderConfig {
1311                local_id_hex,
1312                enable_webrtc: true,
1313                enable_websocket: true,
1314                // Share the discovered_ws_addresses map so that post-discovery calls
1315                // can use the signaling-provided ws:// URL for this actor node.
1316                discovered_ws_addresses: self.discovered_ws_addresses.clone(),
1317                // Pass credential_state so outbound WS handshake carries X-Actr-Credential,
1318                // enabling peer WebSocketGate to perform Ed25519 signature verification.
1319                credential_state: Some(credential_state.clone()),
1320            };
1321            let wire_builder = Arc::new(DefaultWireBuilder::new(
1322                Some(coordinator.clone()),
1323                wire_builder_config,
1324            ));
1325
1326            // Create PeerTransport
1327            use crate::transport::PeerTransport;
1328            let transport_manager = Arc::new(PeerTransport::new(actor_id.clone(), wire_builder));
1329            self.peer_transport = Some(transport_manager.clone());
1330
1331            // Create PeerGate with WebRTC coordinator for MediaTrack support
1332            use crate::outbound::PeerGate;
1333            let outproc_gate =
1334                Arc::new(PeerGate::new(transport_manager, Some(coordinator.clone())));
1335            let outproc_gate_enum = Gate::Peer(outproc_gate.clone());
1336            tracing::info!("PeerTransport + PeerGate initialized");
1337
1338            let data_stream_registry = self.data_stream_registry.clone();
1339
1340            // Create WebRtcGate with shared pending_requests and DataStreamRegistry
1341            let pending_requests = outproc_gate.get_pending_requests();
1342            let gate = Arc::new(crate::wire::webrtc::gate::WebRtcGate::new(
1343                coordinator.clone(),
1344                pending_requests,
1345                data_stream_registry.clone(),
1346            ));
1347            // Set local_id
1348            gate.set_local_id(actor_id.clone()).await;
1349            tracing::info!(
1350                "✅ WebRtcGate created with shared pending_requests and DataStreamRegistry"
1351            );
1352
1353            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1354            // 1.7. Wire the outproc gate into Inner so subsequent
1355            //      `make_runtime_context` / `bootstrap_ctx_builder` calls
1356            //      observe it. All per-request contexts created by
1357            //      `handle_incoming` go through this field live.
1358            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1359            tracing::info!("🔧 Wiring outproc_gate into node");
1360            self.outproc_gate = Some(outproc_gate_enum);
1361            tracing::info!("✅ Node runtime gates fully initialized (inproc + outproc)");
1362
1363            // Save references
1364            self.webrtc_coordinator = Some(coordinator.clone());
1365            self.webrtc_gate = Some(gate.clone());
1366            tracing::info!("✅ WebRTC infrastructure initialized");
1367
1368            // Fire `on_start` once the runtime context can see the initialized
1369            // gates, before starting request-accepting/background loops. Its
1370            // Err/panic aborts Node::start.
1371            {
1372                let startup_ctx = self
1373                    .bootstrap_ctx_builder()
1374                    .build_bootstrap(&actor_id, &credential_state.credential().await);
1375                let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_start");
1376                let call_executor =
1377                    lifecycle_host_abi(startup_ctx.clone(), self.workload_dispatch.clone());
1378                let mut workload = self.workload_dispatch.lock().await;
1379                crate::lifecycle::hooks::call_lifecycle_hook(
1380                    "on_start",
1381                    workload.on_start(startup_ctx, invocation, &call_executor),
1382                )
1383                .await?;
1384            }
1385
1386            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1387            // 1.7.6. WebSocket Server (direct-connect mode, optional)
1388            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1389            if let Some(listen_port) = self.config.websocket_listen_port {
1390                tracing::info!(
1391                    "🔌 WebSocket direct-connect mode enabled, binding port {}",
1392                    listen_port
1393                );
1394                use crate::key_cache::AisKeyCache;
1395                use crate::wire::websocket::gate::WsAuthContext;
1396                use crate::wire::websocket::{WebSocketGate, WebSocketServer};
1397
1398                // Build AisKeyCache and seed it with the signing key from the registration response
1399                let ais_key_cache = AisKeyCache::new();
1400                if !register_ok.signing_pubkey.is_empty() {
1401                    match ais_key_cache
1402                        .seed(register_ok.signing_key_id, &register_ok.signing_pubkey)
1403                        .await
1404                    {
1405                        Ok(()) => tracing::info!(
1406                            key_id = register_ok.signing_key_id,
1407                            "🔑 AisKeyCache seeded from RegisterOk"
1408                        ),
1409                        Err(e) => tracing::warn!(
1410                            key_id = register_ok.signing_key_id,
1411                            error = ?e,
1412                            "AisKeyCache seed failed; WebSocket will reject all inbound connections"
1413                        ),
1414                    }
1415                } else {
1416                    tracing::warn!(
1417                        "RegisterOk missing signing_pubkey; WebSocket credential verification will degrade"
1418                    );
1419                }
1420
1421                let auth_ctx = WsAuthContext {
1422                    ais_key_cache,
1423                    actor_id: actor_id.clone(),
1424                    credential_state: credential_state.clone(),
1425                    signaling_client: self.signaling_client.clone(),
1426                };
1427
1428                match WebSocketServer::bind(listen_port).await {
1429                    Ok((ws_server, conn_rx)) => {
1430                        ws_server.start(self.shutdown_token.clone());
1431                        let ws_gate = Arc::new(WebSocketGate::new(
1432                            conn_rx,
1433                            outproc_gate.get_pending_requests(),
1434                            data_stream_registry.clone(),
1435                            Some(auth_ctx),
1436                        ));
1437
1438                        // Install the WebSocket peer-lifecycle hook.
1439                        {
1440                            let actor_id_for_hook = actor_id.clone();
1441                            let credential_state_for_hook = credential_state.clone();
1442                            // Snapshot taken after outproc_gate is live: ws
1443                            // peer-lifecycle hook contexts can issue
1444                            // Dest::Actor(_) calls.
1445                            let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1446                            let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1447                                Arc::new(move || {
1448                                    let snapshot = ctx_builder_snapshot.clone();
1449                                    let actor_id = actor_id_for_hook.clone();
1450                                    let credential_state = credential_state_for_hook.clone();
1451                                    Box::pin(async move {
1452                                        Some(snapshot.build_bootstrap(
1453                                            &actor_id,
1454                                            &credential_state.credential().await,
1455                                        ))
1456                                    })
1457                                });
1458                            let cb = crate::lifecycle::hooks::build_hook_callback(
1459                                self.hook_observer.clone(),
1460                                ctx_builder,
1461                            );
1462                            ws_gate.set_hook_callback(cb);
1463                        }
1464
1465                        self.websocket_gate = Some(ws_gate);
1466                        tracing::info!(
1467                            "✅ WebSocketServer + WebSocketGate initialized (credential auth enabled)"
1468                        );
1469                    }
1470                    Err(e) => {
1471                        tracing::error!(
1472                            "❌ Failed to bind WebSocket server on port {}: {:?}",
1473                            listen_port,
1474                            e
1475                        );
1476                    }
1477                }
1478            }
1479
1480            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1481            // 1.7.5. Create shared state for credential management
1482            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1483            // Shared credential state initialized above; reused across tasks
1484
1485            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1486            // 1.8. Spawn heartbeat task (periodic Ping to signaling server)
1487            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1488            {
1489                let shutdown = self.shutdown_token.clone();
1490                let client = self.signaling_client.clone();
1491                let actor_id_for_heartbeat = actor_id.clone();
1492                let credential_state_for_heartbeat = credential_state.clone();
1493                let mailbox_for_heartbeat = self.mailbox.clone();
1494                let register_request_for_heartbeat = register_request.clone();
1495                let webrtc_coordinator_for_heartbeat = self.webrtc_coordinator.clone();
1496                let webrtc_gate_for_heartbeat = self.webrtc_gate.clone();
1497
1498                // Use interval from registration response, default to 30s
1499                let heartbeat_interval_secs = register_ok.signaling_heartbeat_interval_secs;
1500                let heartbeat_interval = if heartbeat_interval_secs > 0 {
1501                    Duration::from_secs(heartbeat_interval_secs as u64)
1502                } else {
1503                    Duration::from_secs(30)
1504                };
1505                let ais_endpoint_for_heartbeat = self.config.ais_endpoint.clone();
1506                let realm_secret_for_heartbeat = self.config.realm_secret.clone();
1507                let heartbeat_handle = tokio::spawn(crate::lifecycle::heartbeat::heartbeat_task(
1508                    shutdown,
1509                    client,
1510                    actor_id_for_heartbeat,
1511                    credential_state_for_heartbeat,
1512                    mailbox_for_heartbeat,
1513                    heartbeat_interval,
1514                    register_request_for_heartbeat,
1515                    ais_endpoint_for_heartbeat,
1516                    realm_secret_for_heartbeat,
1517                    node_hook_callback.clone(),
1518                    webrtc_coordinator_for_heartbeat,
1519                    webrtc_gate_for_heartbeat,
1520                ));
1521                task_handles.push(heartbeat_handle);
1522            }
1523            tracing::info!(
1524                "✅ Heartbeat task started (interval: {}s)",
1525                register_ok.signaling_heartbeat_interval_secs
1526            );
1527
1528            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1529            // 1.8.5. Spawn network event processing loop
1530            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1531            if let Some(event_rx) = self.network_event_rx.take() {
1532                use crate::lifecycle::network_event::DefaultNetworkEventProcessor;
1533
1534                // Create DefaultNetworkEventProcessor
1535                // If debounce config exists, use new_with_debounce
1536                let event_processor =
1537                    if let Some(config) = self.network_event_debounce_config.clone() {
1538                        Arc::new(
1539                            DefaultNetworkEventProcessor::new_with_debounce_and_peer_transport(
1540                                self.signaling_client.clone(),
1541                                self.webrtc_coordinator.clone(),
1542                                config,
1543                                self.peer_transport.clone(),
1544                            ),
1545                        )
1546                    } else {
1547                        Arc::new(DefaultNetworkEventProcessor::new_with_peer_transport(
1548                            self.signaling_client.clone(),
1549                            self.webrtc_coordinator.clone(),
1550                            self.peer_transport.clone(),
1551                        ))
1552                    };
1553
1554                let shutdown = self.shutdown_token.clone();
1555                let network_event_handle = tokio::spawn(async move {
1556                    Self::network_event_loop(event_rx, event_processor, shutdown).await;
1557                });
1558                task_handles.push(network_event_handle);
1559                tracing::info!("network_event.node.loop_started");
1560            } else {
1561                tracing::debug!("network_event.node.loop_not_started_no_handle");
1562            }
1563
1564            {
1565                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1566                // 1.9. Spawn dedicated Unregister task (best-effort, with timeout)
1567                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1568                //
1569                // This task:
1570                // - Waits for shutdown_token to be cancelled (e.g., wait_for_ctrl_c_and_shutdown)
1571                // - Then sends UnregisterRequest via signaling client with a timeout
1572                //
1573                // NOTE: we push its JoinHandle into task_handles so it can be aborted
1574                // by ActrRefShared::Drop if needed.
1575                let shutdown = self.shutdown_token.clone();
1576                let client = self.signaling_client.clone();
1577                let actor_id_for_unreg = actor_id.clone();
1578                let credential_state_for_unreg = credential_state.clone();
1579                let webrtc_coordinator = self.webrtc_coordinator.clone();
1580
1581                let unregister_handle = tokio::spawn(async move {
1582                    // Wait for shutdown signal
1583                    shutdown.cancelled().await;
1584                    tracing::info!(
1585                        "📡 Shutdown signal received, sending UnregisterRequest for Actor {}",
1586                        actor_id_for_unreg
1587                    );
1588
1589                    // 1. Close all WebRTC peer connections first (if any)
1590                    if let Some(coord) = webrtc_coordinator {
1591                        if let Err(e) = coord.close_all_peers().await {
1592                            tracing::warn!(
1593                                "⚠️ Failed to close all WebRTC peers before UnregisterRequest: {}",
1594                                e
1595                            );
1596                        } else {
1597                            tracing::info!("✅ All WebRTC peers closed before UnregisterRequest");
1598                        }
1599                    } else {
1600                        tracing::debug!(
1601                            "WebRTC coordinator not found before UnregisterRequest (no WebRTC?)"
1602                        );
1603                    }
1604
1605                    // 2. Then send UnregisterRequest with a timeout (e.g. 5 seconds)
1606                    let result = tokio::time::timeout(
1607                        Duration::from_secs(5),
1608                        client.send_unregister_request(
1609                            actor_id_for_unreg.clone(),
1610                            credential_state_for_unreg.credential().await,
1611                            Some("Graceful shutdown".to_string()),
1612                        ),
1613                    )
1614                    .await;
1615                    tracing::info!("UnregisterRequest result: {:?}", result);
1616                    match result {
1617                        Ok(Ok(_)) => {
1618                            tracing::info!(
1619                                "✅ UnregisterRequest sent to signaling server for Actor {}",
1620                                actor_id_for_unreg
1621                            );
1622                        }
1623                        Ok(Err(e)) => {
1624                            tracing::warn!(
1625                                "⚠️ Failed to send UnregisterRequest for Actor {}: {}",
1626                                actor_id_for_unreg,
1627                                e
1628                            );
1629                        }
1630                        Err(_) => {
1631                            tracing::warn!(
1632                                "⚠️ UnregisterRequest timeout (5s) for Actor {}",
1633                                actor_id_for_unreg
1634                            );
1635                        }
1636                    }
1637                });
1638
1639                task_handles.push(unregister_handle);
1640            }
1641        } // end registration setup block
1642
1643        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1644        // 2. Transport layer initialization (completed via WebRTC infrastructure)
1645        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1646        tracing::info!("✅ Transport layer initialized via WebRTC infrastructure");
1647
1648        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1649        // 3.1 Convert to Arc (before starting background loops)
1650        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1651        // Clone actor_id before moving self into Arc
1652        let actor_id = self
1653            .actor_id
1654            .as_ref()
1655            .ok_or_else(|| ActrError::Internal("Actor ID not set".to_string()))?
1656            .clone();
1657        // Snapshot now that outproc_gate has been wired above; this builder
1658        // is shared between on_start / on_stop hooks and the ActrRefShared
1659        // handle returned to the caller.
1660        let bootstrap_ctx_builder = self.bootstrap_ctx_builder();
1661        let credential_state = self
1662            .credential_state
1663            .clone()
1664            .expect("CredentialState must be initialized in start()");
1665        let shutdown_token = self.shutdown_token.clone();
1666        let node_ref = Arc::new(self);
1667
1668        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1669        // 3.2. Register workload-level stop hook.
1670        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1671        {
1672            let node = node_ref.clone();
1673            let actor_id = actor_id.clone();
1674            let credential_state = credential_state.clone();
1675            let shutdown = shutdown_token.clone();
1676            let on_stop_handle = tokio::spawn(async move {
1677                shutdown.cancelled().await;
1678                let stop_ctx = node
1679                    .bootstrap_ctx_builder()
1680                    .build_bootstrap(&actor_id, &credential_state.credential().await);
1681                let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_stop");
1682                let call_executor =
1683                    lifecycle_host_abi(stop_ctx.clone(), node.workload_dispatch.clone());
1684                let mut workload = node.workload_dispatch.lock().await;
1685                if let Err(e) = crate::lifecycle::hooks::call_lifecycle_hook(
1686                    "on_stop",
1687                    workload.on_stop(stop_ctx, invocation, &call_executor),
1688                )
1689                .await
1690                {
1691                    tracing::warn!(error = %e, "workload on_stop returned Err");
1692                }
1693            });
1694            task_handles.push(on_stop_handle);
1695        }
1696
1697        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1698        // 3.5. Start WebRTC background loops
1699        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1700        tracing::info!("🚀 Starting WebRTC background loops");
1701
1702        // Start WebRtcCoordinator signaling loop
1703        if let Some(coordinator) = &node_ref.webrtc_coordinator {
1704            coordinator.clone().start().await.map_err(|e| {
1705                ActrError::Unavailable(format!("WebRtcCoordinator start failed: {e}"))
1706            })?;
1707            tracing::info!("✅ WebRtcCoordinator signaling loop started");
1708        }
1709
1710        // Start WebRtcGate message receive loop (route to Mailbox)
1711        if let Some(gate) = &node_ref.webrtc_gate {
1712            gate.start_receive_loop(node_ref.mailbox.clone())
1713                .await
1714                .map_err(|e| {
1715                    ActrError::Unavailable(format!("WebRtcGate receive loop start failed: {e}"))
1716                })?;
1717            tracing::info!("✅ WebRtcGate → Mailbox routing started");
1718        }
1719
1720        // Start WebSocketGate message receive loop (route to Mailbox, direct-connect mode)
1721        if let Some(ws_gate) = &node_ref.websocket_gate {
1722            ws_gate
1723                .start_receive_loop(node_ref.mailbox.clone())
1724                .await
1725                .map_err(|e| {
1726                    ActrError::Unavailable(format!("WebSocketGate receive loop start failed: {e}"))
1727                })?;
1728            tracing::info!("✅ WebSocketGate → Mailbox routing started");
1729        }
1730        tracing::info!("✅ WebRTC background loops started");
1731
1732        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1733        // 4.6. Start Inproc receive loop (Shell → Guest)
1734        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1735        if let Some(shell_to_workload) = &node_ref.shell_to_workload {
1736            tracing::info!("🔄 Starting Inproc receive loop (Shell → Guest)");
1737            // Start Guest receive loop (Shell → Guest REQUEST)
1738            if let Some(workload_to_shell) = &node_ref.workload_to_shell {
1739                let node = node_ref.clone();
1740                let request_rx_lane = shell_to_workload
1741                    .get_lane(PayloadType::RpcReliable, None)
1742                    .await
1743                    .map_err(|e| {
1744                        ActrError::Unavailable(format!("Failed to get guest receive lane: {e}"))
1745                    })?;
1746                let response_tx = workload_to_shell.clone();
1747                let shutdown = shutdown_token.clone();
1748
1749                let inproc_handle = tokio::spawn(async move {
1750                    loop {
1751                        tokio::select! {
1752                            _ = shutdown.cancelled() => {
1753                                tracing::info!("📭 Guest receive loop (Shell → Guest) received shutdown signal");
1754                                break;
1755                            }
1756                            envelope_result = request_rx_lane.recv_envelope() => {
1757                                match envelope_result {
1758                                    Ok(envelope) => {
1759                                        let request_id = envelope.request_id.clone();
1760                                        tracing::debug!("📨 Guest received REQUEST from Shell: request_id={}", request_id);
1761                                        // Extract and set tracing context from envelope
1762                                        #[cfg(feature = "opentelemetry")]
1763                                        let span = {
1764                                            let actr_id_str = node.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default();
1765                                            let span = tracing::info_span!("ActrNode.lane_receive", actr_id = %actr_id_str, request_id = %request_id);
1766                                            set_parent_from_rpc_envelope(&span, &envelope);
1767                                            span
1768                                        };
1769
1770                                        // Shell calls have no caller_id (local process communication)
1771                                        let handle_incoming_fut = node.handle_incoming(envelope.clone(), None);
1772                                        #[cfg(feature = "opentelemetry")]
1773                                        let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
1774
1775                                        match handle_incoming_fut.await {
1776                                            Ok(response_bytes) => {
1777                                                // Send RESPONSE back via workload_to_shell
1778                                                // Keep same route_key (no prefix needed - separate channels!)
1779                                                #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1780                                                let mut response_envelope = RpcEnvelope {
1781                                                    route_key: envelope.route_key.clone(),
1782                                                    payload: Some(response_bytes),
1783                                                    error: None,
1784                                                    traceparent: None,
1785                                                    tracestate: None,
1786                                                    request_id: request_id.clone(),
1787                                                    metadata: Vec::new(),
1788                                                    timeout_ms: 30000,
1789                                                };
1790                                                // Inject tracing context
1791                                                #[cfg(feature = "opentelemetry")]
1792                                                inject_span_context_to_rpc(&span, &mut response_envelope);
1793
1794                                                // Send via Guest → Shell channel
1795                                                let send_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, response_envelope);
1796                                                #[cfg(feature = "opentelemetry")]
1797                                                let send_response_fut = send_response_fut.instrument(span.clone());
1798                                                if let Err(e) = send_response_fut.await {
1799                                                    tracing::error!(
1800                                                        severity = 7,
1801                                                        error_category = "transport_error",
1802                                                        request_id = %request_id,
1803                                                        "❌ Failed to send RESPONSE to Shell: {:?}",
1804                                                        e
1805                                                    );
1806                                                }
1807                                            }
1808                                            Err(e) => {
1809                                                tracing::error!(
1810                                                    severity = 6,
1811                                                    error_category = "handler_error",
1812                                                    request_id = %request_id,
1813                                                    route_key = %envelope.route_key,
1814                                                    "❌ Guest message handling failed: {:?}",
1815                                                    e
1816                                                );
1817
1818                                                // Send error response (system-level error on envelope)
1819                                                let error_response = actr_protocol::ErrorResponse {
1820                                                    code: protocol_error_to_code(&e),
1821                                                    message: e.to_string(),
1822                                                };
1823                                                #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1824                                                let mut error_envelope = RpcEnvelope {
1825                                                    route_key: envelope.route_key.clone(),
1826                                                    payload: None,
1827                                                    error: Some(error_response),
1828                                                    traceparent: envelope.traceparent.clone(),
1829                                                    tracestate: envelope.tracestate.clone(),
1830                                                    request_id: request_id.clone(),
1831                                                    metadata: Vec::new(),
1832                                                    timeout_ms: 30000,
1833                                                };
1834                                                // Inject tracing context
1835                                                #[cfg(feature = "opentelemetry")]
1836                                                inject_span_context_to_rpc(&span, &mut error_envelope);
1837
1838                                                let send_error_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, error_envelope);
1839                                                #[cfg(feature = "opentelemetry")]
1840                                                let send_error_response_fut = send_error_response_fut.instrument(span);
1841                                                if let Err(send_err) = send_error_response_fut.await {
1842                                                    tracing::error!(
1843                                                        severity = 7,
1844                                                        error_category = "transport_error",
1845                                                        request_id = %request_id,
1846                                                        "❌ Failed to send ERROR response to Shell: {:?}",
1847                                                        send_err
1848                                                    );
1849                                                }
1850                                            }
1851                                        }
1852                                    }
1853                                    Err(e) => {
1854                                        tracing::error!(
1855                                            severity = 8,
1856                                            error_category = "transport_error",
1857                                            "❌ Failed to receive from Shell → Guest lane: {:?}",
1858                                            e
1859                                        );
1860                                        break;
1861                                    }
1862                                }
1863                            }
1864                        }
1865                    }
1866                    tracing::info!("✅ Guest receive loop (Shell → Guest) terminated gracefully");
1867                });
1868                task_handles.push(inproc_handle);
1869            }
1870        }
1871        tracing::info!("✅ Guest receive loop (Shell → Guest REQUEST) started");
1872
1873        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1874        // 4.7. Start Shell receive loop (Guest → Shell RESPONSE)
1875        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1876        tracing::info!("🔄 Starting Shell receive loop (Guest → Shell RESPONSE)");
1877        if let Some(workload_to_shell) = &node_ref.workload_to_shell {
1878            // Start Shell receive loop (Guest → Shell RESPONSE)
1879            if let Some(shell_to_workload) = &node_ref.shell_to_workload {
1880                let response_rx_lane = workload_to_shell
1881                    .get_lane(PayloadType::RpcReliable, None)
1882                    .await
1883                    .map_err(|e| {
1884                        ActrError::Unavailable(format!("Failed to get shell receive lane: {e}"))
1885                    })?;
1886                let request_mgr = shell_to_workload.clone();
1887                let shutdown = shutdown_token.clone();
1888
1889                let shell_receive_handle = tokio::spawn(async move {
1890                    loop {
1891                        tokio::select! {
1892                            _ = shutdown.cancelled() => {
1893                                tracing::info!("📭 Shell receive loop (Guest → Shell) received shutdown signal");
1894                                break;
1895                            }
1896                            envelope_result = response_rx_lane.recv_envelope() => {
1897                                match envelope_result {
1898                                    Ok(envelope) => {
1899                                        tracing::debug!(
1900                                            "📨 Shell received RESPONSE from Guest: request_id={}",
1901                                            envelope.request_id
1902                                        );
1903
1904                                        // Check if response is success or error
1905                                        match (envelope.payload, envelope.error) {
1906                                            (Some(payload), None) => {
1907                                                // Success response
1908                                                if let Err(e) = request_mgr
1909                                                    .complete_response(&envelope.request_id, payload)
1910                                                    .await
1911                                                {
1912                                                    tracing::warn!(
1913                                                        severity = 4,
1914                                                        error_category = "orphan_response",
1915                                                        request_id = %envelope.request_id,
1916                                                        "⚠️  No pending request found for response: {:?}",
1917                                                        e
1918                                                    );
1919                                                }
1920                                            }
1921                                            (None, Some(error)) => {
1922                                                // Error response - convert to ActrError and complete with error
1923                                                let actr_err = ActrError::Unavailable(format!("RPC error {}: {}", error.code, error.message));
1924                                                if let Err(e) = request_mgr
1925                                                    .complete_error(&envelope.request_id, actr_err)
1926                                                    .await
1927                                                {
1928                                                    tracing::warn!(
1929                                                        severity = 4,
1930                                                        error_category = "orphan_response",
1931                                                        request_id = %envelope.request_id,
1932                                                        "⚠️  No pending request found for error response: {:?}",
1933                                                        e
1934                                                    );
1935                                                }
1936                                            }
1937                                            _ => {
1938                                                tracing::error!(
1939                                                    severity = 7,
1940                                                    error_category = "protocol_error",
1941                                                    request_id = %envelope.request_id,
1942                                                    "❌ Invalid RpcEnvelope: both payload and error are present or both absent"
1943                                                );
1944                                            }
1945                                        }
1946                                    }
1947                                    Err(e) => {
1948                                        tracing::error!(
1949                                            severity = 8,
1950                                            error_category = "transport_error",
1951                                            "❌ Failed to receive from Guest → Shell lane: {:?}",
1952                                            e
1953                                        );
1954                                        break;
1955                                    }
1956                                }
1957                            }
1958                        }
1959                    }
1960                    tracing::info!("✅ Shell receive loop (Guest → Shell) terminated gracefully");
1961                });
1962                task_handles.push(shell_receive_handle);
1963            }
1964        }
1965        tracing::info!("✅ Shell receive loop (Guest → Shell RESPONSE) started");
1966
1967        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1968        // 4.9. Mailbox backpressure watchdog
1969        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1970        //
1971        // Emits the framework `on_mailbox_backpressure` hook once per
1972        // rising-edge crossing of the configured threshold.
1973        //
1974        // Preferred path: a push-based notification from the mailbox
1975        // backend via [`Mailbox::set_depth_observer`], which runs
1976        // synchronously on every enqueue and has zero worst-case delay.
1977        //
1978        // Fallback path: mailbox backends without depth support (or
1979        // which can't cheaply compute depth on every enqueue) keep
1980        // using a 1 Hz poll of [`Mailbox::status`].
1981        let backpressure_threshold = node_ref.mailbox_backpressure_threshold;
1982        {
1983            use std::sync::atomic::{AtomicBool, Ordering};
1984            let mailbox = node_ref.mailbox.clone();
1985            let shutdown = shutdown_token.clone();
1986            let hook_cb = node_hook_callback.clone();
1987            let triggered = Arc::new(AtomicBool::new(false));
1988
1989            // Shared rising-edge state + hook-firing closure used by
1990            // both the push and polling code paths.
1991            let fire_if_rising = {
1992                let triggered = triggered.clone();
1993                let hook_cb = hook_cb.clone();
1994                Arc::new(move |queue_len: usize| {
1995                    if queue_len >= backpressure_threshold {
1996                        if !triggered.swap(true, Ordering::AcqRel) {
1997                            if let Some(cb) = hook_cb.as_ref() {
1998                                let cb = cb.clone();
1999                                tokio::spawn(async move {
2000                                    cb(crate::wire::webrtc::HookEvent::MailboxBackpressure {
2001                                        queue_len,
2002                                        threshold: backpressure_threshold,
2003                                    })
2004                                    .await;
2005                                });
2006                            } else {
2007                                tracing::warn!(
2008                                    queue_len,
2009                                    threshold = backpressure_threshold,
2010                                    "mailbox backpressure",
2011                                );
2012                            }
2013                        }
2014                    } else if triggered.swap(false, Ordering::AcqRel) {
2015                        tracing::info!(
2016                            queue_len,
2017                            threshold = backpressure_threshold,
2018                            "mailbox backpressure cleared",
2019                        );
2020                    }
2021                })
2022            };
2023
2024            // Try the push path first. The observer installs only if
2025            // the backend supports it; otherwise `installed` is `false`
2026            // and we fall through to polling.
2027            struct EnqueueObserver {
2028                fire: Arc<dyn Fn(usize) + Send + Sync + 'static>,
2029            }
2030            impl actr_runtime_mailbox::MailboxDepthObserver for EnqueueObserver {
2031                fn on_depth_change(&self, queued_messages: usize) {
2032                    (self.fire)(queued_messages);
2033                }
2034            }
2035
2036            let installed = {
2037                let observer: Arc<dyn actr_runtime_mailbox::MailboxDepthObserver> =
2038                    Arc::new(EnqueueObserver {
2039                        fire: fire_if_rising.clone(),
2040                    });
2041                mailbox.set_depth_observer(observer)
2042            };
2043
2044            if installed {
2045                tracing::debug!("mailbox backpressure watchdog: push notifications enabled");
2046            } else {
2047                tracing::debug!(
2048                    "mailbox backpressure watchdog: backend does not support push, falling back to 1 Hz polling"
2049                );
2050                let mailbox_for_poll = mailbox.clone();
2051                let shutdown_for_poll = shutdown.clone();
2052                let fire_for_poll = fire_if_rising.clone();
2053                let watchdog_handle = tokio::spawn(async move {
2054                    let mut ticker = tokio::time::interval(Duration::from_secs(1));
2055                    ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
2056                    loop {
2057                        tokio::select! {
2058                            _ = shutdown_for_poll.cancelled() => {
2059                                tracing::debug!(
2060                                    "mailbox backpressure watchdog shutting down"
2061                                );
2062                                break;
2063                            }
2064                            _ = ticker.tick() => {
2065                                let status = match mailbox_for_poll.status().await {
2066                                    Ok(s) => s,
2067                                    Err(e) => {
2068                                        tracing::debug!(?e, "mailbox status poll failed");
2069                                        continue;
2070                                    }
2071                                };
2072                                fire_for_poll(status.queued_messages as usize);
2073                            }
2074                        }
2075                    }
2076                });
2077                task_handles.push(watchdog_handle);
2078            }
2079        }
2080
2081        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2082        // 5. Start Mailbox processing loop (State Path)
2083        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2084        tracing::info!("🔄 Starting Mailbox processing loop (State Path)");
2085        {
2086            let node = node_ref.clone();
2087            let mailbox = node_ref.mailbox.clone();
2088            let gate = node_ref.webrtc_gate.clone();
2089            let shutdown = shutdown_token.clone();
2090
2091            let mailbox_handle = tokio::spawn(async move {
2092                loop {
2093                    tokio::select! {
2094                        // Listen for shutdown signal
2095                        _ = shutdown.cancelled() => {
2096                            tracing::info!("📭 Mailbox loop received shutdown signal");
2097                            break;
2098                        }
2099                        // Dequeue messages (by priority)
2100                        result = mailbox.dequeue() => {
2101                            match result {
2102                                Ok(messages) => {
2103                                    if messages.is_empty() {
2104                                        // Queue empty, sleep briefly
2105                                        tokio::time::sleep(Duration::from_millis(10)).await;
2106                                        continue;
2107                                    }
2108                                    tracing::debug!("📬 Mailbox dequeue: {} messages", messages.len());
2109
2110                                    // Process messages one by one
2111                                    for msg_record in messages {
2112                                        // Deserialize RpcEnvelope (Protobuf)
2113                                        match RpcEnvelope::decode(&msg_record.payload[..]) {
2114                                            Ok(envelope) => {
2115                                                let request_id = envelope.request_id.clone();
2116                                                let queue_latency_ms = (chrono::Utc::now() - msg_record.created_at).num_milliseconds();
2117                                                tracing::info!(request_id = %request_id, queue_latency_ms = queue_latency_ms, "rpc.mailbox.dequeued");
2118
2119                                                tracing::debug!("📦 Processing message: request_id={}", request_id);
2120                                                #[cfg(feature = "opentelemetry")]
2121                                                let span = {
2122                                                    let actr_id_str = node.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default();
2123                                                    let span = tracing::info_span!("ActrNode.mailbox_receive", actr_id = %actr_id_str, request_id = %request_id, queue_wait_ms = queue_latency_ms);
2124                                                    set_parent_from_rpc_envelope(&span, &envelope);
2125                                                    span
2126                                                };
2127
2128                                                // Decode caller_id from MessageRecord.from (transport layer)
2129                                                let caller_id_result = ActrId::decode(&msg_record.from[..]);
2130                                                let caller_id_ref = caller_id_result.as_ref().ok();
2131
2132                                                if caller_id_ref.is_none() {
2133                                                    tracing::warn!(
2134                                                        request_id = %request_id,
2135                                                        "⚠️  Failed to decode caller_id from MessageRecord.from"
2136                                                    );
2137                                                }
2138
2139                                                // Call handle_incoming with caller_id from transport layer
2140                                                let handle_incoming_fut = node.handle_incoming(envelope.clone(), caller_id_ref);
2141                                                #[cfg(feature = "opentelemetry")]
2142                                                let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
2143
2144                                                match handle_incoming_fut.await {
2145                                                    Ok(response_bytes) => {
2146                                                        // Send response (reuse request_id)
2147                                                        if let Some(ref gate) = gate {
2148                                                            // Use already decoded caller_id
2149                                                            match caller_id_result {
2150                                                                Ok(caller) => {
2151                                                                    // Construct response RpcEnvelope (reuse request_id!)
2152                                                                    #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
2153                                                                    let mut response_envelope = RpcEnvelope {
2154                                                                        request_id, // Reuse!
2155                                                                        route_key: envelope.route_key.clone(),
2156                                                                        payload: Some(response_bytes),
2157                                                                        error: None,
2158                                                                        traceparent: envelope.traceparent.clone(),
2159                                                                        tracestate: envelope.tracestate.clone(),
2160                                                                        metadata: Vec::new(), // Response doesn't need extra metadata
2161                                                                        timeout_ms: 30000,
2162                                                                    };
2163                                                                    // Inject tracing context
2164                                                                    #[cfg(feature = "opentelemetry")]
2165                                                                    inject_span_context_to_rpc(&span, &mut response_envelope);
2166
2167                                                                    let send_response_fut = gate.send_response(&caller, response_envelope);
2168                                                                    #[cfg(feature = "opentelemetry")]
2169                                                                    let send_response_fut = send_response_fut.instrument(span);
2170                                                                    if let Err(e) = send_response_fut.await {
2171                                                                        tracing::error!(
2172                                                                            severity = 7,
2173                                                                            error_category = "transport_error",
2174                                                                            request_id = %envelope.request_id,
2175                                                                            "❌ Failed to send response: {:?}",
2176                                                                            e
2177                                                                        );
2178                                                                    }
2179                                                                }
2180                                                                Err(e) => {
2181                                                                    tracing::error!(
2182                                                                        severity = 8,
2183                                                                        error_category = "protobuf_decode",
2184                                                                        request_id = %envelope.request_id,
2185                                                                        "❌ Failed to decode caller_id: {:?}",
2186                                                                        e
2187                                                                    );
2188                                                                }
2189                                                            }
2190                                                        }
2191
2192                                                        // ACK message
2193                                                        if let Err(e) = mailbox.ack(msg_record.id).await {
2194                                                            tracing::error!(
2195                                                                severity = 9,
2196                                                                error_category = "mailbox_error",
2197                                                                request_id = %envelope.request_id,
2198                                                                message_id = %msg_record.id,
2199                                                                "❌ Mailbox ACK failed: {:?}",
2200                                                                e
2201                                                            );
2202                                                        }
2203                                                    }
2204                                                    Err(e) => {
2205                                                        tracing::error!(
2206                                                            severity = 6,
2207                                                            error_category = "handler_error",
2208                                                            request_id = %envelope.request_id,
2209                                                            route_key = %envelope.route_key,
2210                                                            "❌ handle_incoming failed: {:?}", e
2211                                                        );
2212                                                        // ACK to avoid infinite retries
2213                                                        // Application errors are caller's responsibility
2214                                                        let _ = mailbox.ack(msg_record.id).await;
2215                                                    }
2216                                                }
2217                                            }
2218                                            Err(e) => {
2219                                                // Poison message - cannot decode RpcEnvelope
2220                                                tracing::error!(
2221                                                    severity = 9,
2222                                                    error_category = "protobuf_decode",
2223                                                    message_id = %msg_record.id,
2224                                                    "❌ Poison message: Failed to deserialize RpcEnvelope: {:?}",
2225                                                    e
2226                                                );
2227
2228                                                // Write to Dead Letter Queue
2229                                                use actr_runtime_mailbox::DlqRecord;
2230                                                use chrono::Utc;
2231                                                use uuid::Uuid;
2232
2233                                                let dlq_record = DlqRecord {
2234                                                    id: Uuid::new_v4(),
2235                                                    original_message_id: Some(msg_record.id.to_string()),
2236                                                    from: Some(msg_record.from.clone()),
2237                                                    to: node.actor_id.as_ref().map(|id| {
2238                                                        let mut buf = Vec::new();
2239                                                        id.encode(&mut buf).unwrap();
2240                                                        buf
2241                                                    }),
2242                                                    raw_bytes: msg_record.payload.clone(),
2243                                                    error_message: format!("Protobuf decode failed: {e}"),
2244                                                    error_category: "protobuf_decode".to_string(),
2245                                                    trace_id: format!("mailbox-{}", msg_record.id),
2246                                                    request_id: None,
2247                                                    created_at: Utc::now(),
2248                                                    redrive_attempts: 0,
2249                                                    last_redrive_at: None,
2250                                                    context: Some(format!(
2251                                                        r#"{{"source":"mailbox","priority":"{}"}}"#,
2252                                                        match msg_record.priority {
2253                                                            actr_runtime_mailbox::MessagePriority::High => "high",
2254                                                            actr_runtime_mailbox::MessagePriority::Normal => "normal",
2255                                                        }
2256                                                    )),
2257                                                };
2258
2259                                                if let Err(dlq_err) = node.dlq.enqueue(dlq_record).await {
2260                                                    tracing::error!(
2261                                                        severity = 10,
2262                                                        "❌ CRITICAL: Failed to write poison message to DLQ: {:?}",
2263                                                        dlq_err
2264                                                    );
2265                                                } else {
2266                                                    tracing::warn!(
2267                                                        severity = 9,
2268                                                        "☠️ Poison message moved to DLQ: message_id={}",
2269                                                        msg_record.id
2270                                                    );
2271                                                }
2272
2273                                                // ACK the poison message to remove from mailbox
2274                                                let _ = mailbox.ack(msg_record.id).await;
2275                                            }
2276                                        }
2277                                    }
2278                                }
2279                                Err(e) => {
2280                                    tracing::error!(
2281                                        severity = 9,
2282                                        error_category = "mailbox_error",
2283                                        "❌ Mailbox dequeue failed: {:?}", e
2284                                    );
2285                                    tokio::time::sleep(Duration::from_secs(1)).await;
2286                                }
2287                            }
2288                        }
2289                    }
2290                }
2291                tracing::info!("✅ Mailbox processing loop terminated gracefully");
2292            });
2293
2294            task_handles.push(mailbox_handle);
2295        }
2296        tracing::info!("✅ Mailbox processing loop started");
2297        tracing::info!("✅ ActrNode started successfully");
2298
2299        {
2300            let ready_ctx = bootstrap_ctx_builder
2301                .build_bootstrap(&actor_id, &credential_state.credential().await);
2302            let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_ready");
2303            let call_executor =
2304                lifecycle_host_abi(ready_ctx.clone(), node_ref.workload_dispatch.clone());
2305            let mut workload = node_ref.workload_dispatch.lock().await;
2306            if let Err(e) = crate::lifecycle::hooks::call_lifecycle_hook(
2307                "on_ready",
2308                workload.on_ready(ready_ctx, invocation, &call_executor),
2309            )
2310            .await
2311            {
2312                tracing::warn!(error = %e, "workload on_ready returned Err");
2313            }
2314        }
2315
2316        // Create ActrRefShared
2317        let shared = Arc::new(ActrRefShared {
2318            actor_id,
2319            bootstrap_ctx_builder,
2320            credential_state,
2321            shutdown_token,
2322            task_handles: Mutex::new(task_handles),
2323        });
2324
2325        // Create ActrRef
2326        tracing::info!("✅ ActrRef created (Shell → Guest communication handle)");
2327
2328        Ok(ActrRef { shared })
2329    }
2330}