Skip to main content

actr_hyper/lifecycle/
node.rs

1//! Node runtime inner — holds all running-state fields for an attached node.
2//!
3//! This module is the internal implementation backing the public
4//! `Node<Attached>` / `Node<Registered>` typestate chain defined in
5//! `crate::lib`. The struct itself is crate-private; consumers interact with
6//! it indirectly through `Node<S>` → `ActrRef` transitions.
7
8use crate::actr_ref::{ActrRef, ActrRefShared};
9use crate::ais_client::AisClient;
10use crate::context::{BootstrapContextBuilder, RuntimeContext};
11use crate::inbound::{DataStreamRegistry, MediaFrameRegistry};
12use crate::lifecycle::dedup::{DEDUP_TTL, DedupOutcome, DedupState, DedupWaiter};
13use crate::outbound::Gate;
14use crate::transport::HostTransport;
15use crate::wire::webrtc::SignalingClient;
16#[cfg(feature = "opentelemetry")]
17use crate::wire::webrtc::trace::{inject_span_context_to_rpc, set_parent_from_rpc_envelope};
18use actr_framework::Bytes;
19use actr_protocol::prost::Message as ProstMessage;
20use actr_protocol::{
21    AIdCredential, ActorResult, ActrError, ActrId, PayloadType, RegisterAuthMode, RegisterRequest,
22    RpcEnvelope, TurnCredential, register_response,
23};
24use actr_runtime::check_acl_permission;
25use actr_runtime_mailbox::{DeadLetterQueue, Mailbox};
26use std::sync::Arc;
27use std::time::Duration;
28use tokio::sync::{Mutex, RwLock};
29use tokio_util::sync::CancellationToken;
30#[cfg(feature = "opentelemetry")]
31use tracing::Instrument as _;
32
33/// Internal running-state of an attached node.
34///
35/// Holds every field required to run a workload after `Hyper::attach` has
36/// bound a package. Kept private to the crate: external callers use the
37/// public `Node<S>` wrappers in `crate::lib` and the `ActrRef` handle
38/// returned by `Node::start`.
39pub(crate) struct Inner {
40    /// Runtime configuration
41    pub(crate) config: actr_config::RuntimeConfig,
42
43    /// SQLite persistent mailbox
44    pub(crate) mailbox: Arc<dyn Mailbox>,
45
46    /// Dead Letter Queue for poison messages
47    pub(crate) dlq: Arc<dyn DeadLetterQueue>,
48
49    /// In-process gate for `Dest::Shell` / `Dest::Local` calls.
50    ///
51    /// Created in `build()` together with `shell_to_workload` so the inproc
52    /// lane is usable as soon as the node exists, even before registration.
53    pub(crate) inproc_gate: Gate,
54
55    /// Cross-process gate for `Dest::Actor(_)` calls.
56    ///
57    /// `None` until `start()` finishes WebRTC / PeerGate initialization. Any
58    /// outbound call issued before that point returns `Internal("PeerGate
59    /// not initialized yet")` — see `RuntimeContext::select_gate`.
60    pub(crate) outproc_gate: Option<Gate>,
61
62    /// DataStream callback registry shared between the inbound WebRTC / WS
63    /// gates (which dispatch into it) and `RuntimeContext`
64    /// (register_stream / send_data_stream).
65    pub(crate) data_stream_registry: Arc<DataStreamRegistry>,
66
67    /// MediaTrack callback registry shared between WebRTC media tracks and
68    /// `RuntimeContext` (register_media_track / send_media_sample).
69    pub(crate) media_frame_registry: Arc<MediaFrameRegistry>,
70
71    /// Signaling client
72    pub(crate) signaling_client: Arc<dyn SignalingClient>,
73
74    /// Actor ID (obtained after startup)
75    pub(crate) actor_id: Option<ActrId>,
76
77    /// Actor Credential (obtained after startup, used for subsequent authentication messages)
78    pub(crate) credential_state: Option<CredentialState>,
79
80    /// WebRTC coordinator (created after startup)
81    pub(crate) webrtc_coordinator: Option<Arc<crate::wire::webrtc::WebRtcCoordinator>>,
82
83    /// WebRTC Gate (created after startup)
84    pub(crate) webrtc_gate: Option<Arc<crate::wire::webrtc::gate::WebRtcGate>>,
85
86    /// WebSocket Gate (direct-connect mode inbound, optional)
87    pub(crate) websocket_gate: Option<Arc<crate::wire::websocket::WebSocketGate>>,
88
89    /// Shell → Workload transport (REQUEST direction)
90    ///
91    /// Workload receives REQUEST from Shell (zero serialization, direct RpcEnvelope passing)
92    pub(crate) shell_to_workload: Option<Arc<HostTransport>>,
93
94    /// Workload → Shell transport (RESPONSE direction)
95    ///
96    /// Workload sends RESPONSE to Shell (separate pending_requests from Shell's)
97    pub(crate) workload_to_shell: Option<Arc<HostTransport>>,
98
99    /// Shutdown token for graceful shutdown
100    pub(crate) shutdown_token: CancellationToken,
101
102    /// Packaged manifest.lock.toml content loaded at startup for fingerprint lookups.
103    ///
104    /// Wrapped in `Arc` so per-request `RuntimeContext` clones only bump a refcount
105    /// instead of deep-cloning the dependency vector.
106    pub(crate) actr_lock: Option<Arc<actr_config::lock::LockFile>>,
107    /// Network event receiver (from NetworkEventHandle)
108    pub(crate) network_event_rx:
109        Option<tokio::sync::mpsc::Receiver<crate::lifecycle::network_event::NetworkEvent>>,
110
111    /// Network event result sender (to NetworkEventHandle)
112    pub(crate) network_event_result_tx:
113        Option<tokio::sync::mpsc::Sender<crate::lifecycle::network_event::NetworkEventResult>>,
114
115    /// Network event debounce configuration
116    pub(crate) network_event_debounce_config:
117        Option<crate::lifecycle::network_event::DebounceConfig>,
118
119    /// Request deduplication state (15 s TTL response cache, prevents double-processing on retry)
120    pub(crate) dedup_state: Arc<Mutex<DedupState>>,
121
122    /// Verified package manifest for package-backed nodes.
123    #[allow(dead_code)]
124    pub(crate) package_manifest: Option<actr_pack::PackageManifest>,
125
126    /// Pre-issued registration credential injected by the Hyper layer during
127    /// the `Attached → Registered` state transition. `start()` uses it directly
128    /// instead of re-registering with the signaling server.
129    pub(crate) preregistered_credential: Option<actr_protocol::register_response::RegisterOk>,
130
131    /// Shared WebSocket direct-connect address map populated by discovery
132    ///
133    /// Shared with `DefaultWireBuilder` so discovered ws:// URLs can be reused
134    /// directly instead of relying on a static url_template
135    /// The map is keyed by `ActrId`.
136    pub(crate) discovered_ws_addresses:
137        Arc<tokio::sync::RwLock<std::collections::HashMap<ActrId, String>>>,
138
139    /// Runtime workload (WASM, dynclib, etc.)
140    ///
141    /// `handle_incoming` dispatches through this workload.
142    ///
143    /// The `Mutex` serializes dispatch into a single guest actor instance:
144    /// `WasmWorkload::handle` and `DynClibWorkload::handle` both take
145    /// `&mut self` because the underlying Wasmtime `Store` / native guest
146    /// ABI is single-threaded, so concurrent dispatch through the same
147    /// instance would be unsound. Lifecycle hooks also take this lock because
148    /// package-backed WASM / dynclib workloads expose them on the same guest
149    /// instance; transport and other observation hooks reach linked workloads
150    /// through `hook_observer` without holding this lock.
151    pub(crate) workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
152
153    /// Optional shell-side observer that receives linked-workload transport /
154    /// credential / mailbox hook invocations.
155    ///
156    /// `None` means "no observer installed"; the built-in tracing defaults
157    /// still fire from the event-source wiring sites. When `Some`, hook
158    /// invocations are dispatched through `lifecycle::hooks::spawn_hook`
159    /// so panics in observer code cannot unwind into the event source.
160    #[allow(dead_code)]
161    pub(crate) hook_observer: Option<crate::lifecycle::hooks::WorkloadHookObserverRef>,
162
163    /// Queue-length threshold at which the mailbox backpressure
164    /// watchdog fires the framework `on_mailbox_backpressure` hook.
165    ///
166    /// Resolved from [`HyperConfig`] at node construction time so the
167    /// runtime loop does not need to hold a reference back to `HyperConfig`.
168    pub(crate) mailbox_backpressure_threshold: usize,
169
170    /// Lead time before credential expiry at which the framework fires
171    /// the `on_credential_expiring` hook. Resolved from [`HyperConfig`]
172    /// at node construction time.
173    #[allow(dead_code)]
174    pub(crate) credential_expiry_warning: Duration,
175}
176
177/// Credential state for shared access between tasks
178#[derive(Clone)]
179pub struct CredentialState {
180    inner: Arc<RwLock<CredentialStateInner>>,
181}
182
183#[derive(Clone)]
184struct CredentialStateInner {
185    credential: AIdCredential,
186    expires_at: Option<prost_types::Timestamp>,
187    /// HMAC time-limited TURN credential, updated together with credential on registration/renewal
188    turn_credential: Option<TurnCredential>,
189}
190
191impl CredentialState {
192    /// Create a new CredentialState with TURN credential
193    pub fn new(
194        credential: AIdCredential,
195        expires_at: Option<prost_types::Timestamp>,
196        turn_credential: Option<TurnCredential>,
197    ) -> Self {
198        Self {
199            inner: Arc::new(RwLock::new(CredentialStateInner {
200                credential,
201                expires_at,
202                turn_credential,
203            })),
204        }
205    }
206
207    pub async fn credential(&self) -> AIdCredential {
208        self.inner.read().await.credential.clone()
209    }
210
211    pub async fn expires_at(&self) -> Option<prost_types::Timestamp> {
212        self.inner.read().await.expires_at
213    }
214
215    /// Get TURN credential (HMAC time-limited credential)
216    pub async fn turn_credential(&self) -> Option<TurnCredential> {
217        self.inner.read().await.turn_credential.clone()
218    }
219
220    /// Update credential and TURN credential
221    ///
222    /// Called on credential renewal; only overwrites the old TURN credential when the new one is not empty
223    pub(crate) async fn update(
224        &self,
225        credential: AIdCredential,
226        expires_at: Option<prost_types::Timestamp>,
227        turn_credential: Option<TurnCredential>,
228    ) {
229        let mut guard = self.inner.write().await;
230        guard.credential = credential;
231        guard.expires_at = expires_at;
232        if turn_credential.is_some() {
233            guard.turn_credential = turn_credential;
234        }
235    }
236}
237
238/// Host operation executor - routes guest outbound calls through RuntimeContext
239///
240/// Called by the workload dispatch path in `handle_incoming`.
241async fn host_operation_handler(
242    ctx: crate::context::RuntimeContext,
243    workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
244    pending: crate::workload::HostOperation,
245) -> crate::workload::HostOperationResult {
246    use crate::workload::{HostOperation, HostOperationResult, decode_dest};
247    use actr_framework::guest::dynclib_abi::code as abi_code;
248    use actr_framework::{Context as _, Dest};
249    use actr_protocol::{DataStream, PayloadType};
250
251    /// Map `ActrError` to ABI error code, preserving semantics for guest-side discrimination
252    fn actr_error_to_code(err: &ActrError) -> i32 {
253        match err {
254            ActrError::DecodeFailure(_) | ActrError::InvalidArgument(_) => abi_code::PROTOCOL_ERROR,
255            _ => abi_code::GENERIC_ERROR,
256        }
257    }
258
259    match pending {
260        HostOperation::CallRaw(req) => {
261            match ctx
262                .call_raw(
263                    &Dest::Actor(req.target),
264                    req.route_key,
265                    PayloadType::RpcReliable,
266                    bytes::Bytes::from(req.payload),
267                    30_000,
268                )
269                .await
270            {
271                Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
272                Err(e) => {
273                    tracing::error!("call_raw routing failed: {e:?}");
274                    HostOperationResult::Error(actr_error_to_code(&e))
275                }
276            }
277        }
278
279        HostOperation::Call(req) => {
280            let dest = match decode_dest(&req.dest) {
281                Some(d) => d,
282                None => {
283                    tracing::error!(route_key = req.route_key, "call: dest decode failed");
284                    return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
285                }
286            };
287            match ctx
288                .call_raw(
289                    &dest,
290                    req.route_key,
291                    PayloadType::RpcReliable,
292                    bytes::Bytes::from(req.payload),
293                    30_000,
294                )
295                .await
296            {
297                Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
298                Err(e) => {
299                    tracing::error!("call routing failed: {e:?}");
300                    HostOperationResult::Error(actr_error_to_code(&e))
301                }
302            }
303        }
304
305        HostOperation::Tell(req) => {
306            let dest = match decode_dest(&req.dest) {
307                Some(d) => d,
308                None => {
309                    tracing::error!(route_key = req.route_key, "tell: dest decode failed");
310                    return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
311                }
312            };
313            match ctx
314                .tell_raw(
315                    &dest,
316                    req.route_key,
317                    PayloadType::RpcReliable,
318                    bytes::Bytes::from(req.payload),
319                )
320                .await
321            {
322                Ok(()) => HostOperationResult::Done,
323                Err(e) => {
324                    tracing::error!("tell routing failed: {e:?}");
325                    HostOperationResult::Error(actr_error_to_code(&e))
326                }
327            }
328        }
329
330        HostOperation::Discover(req) => {
331            match ctx.discover_route_candidate(&req.target_type).await {
332                Ok(id) => HostOperationResult::Bytes(id.encode_to_vec()),
333                Err(e) => {
334                    tracing::error!("discover failed: {e:?}");
335                    HostOperationResult::Error(actr_error_to_code(&e))
336                }
337            }
338        }
339
340        HostOperation::RegisterStream(req) => {
341            let stream_id = req.stream_id;
342            let callback_ctx = ctx.clone();
343            let callback_workload_dispatch = workload_dispatch.clone();
344            match ctx
345                .register_stream(stream_id, move |chunk: DataStream, sender| {
346                    let ctx_for_executor = callback_ctx.clone();
347                    let workload_dispatch = callback_workload_dispatch.clone();
348                    Box::pin(async move {
349                        let invocation = crate::workload::InvocationContext {
350                            self_id: actr_framework::Context::self_id(&ctx_for_executor).clone(),
351                            caller_id: Some(sender.clone()),
352                            request_id: format!(
353                                "data-stream:{}:{}",
354                                chunk.stream_id, chunk.sequence
355                            ),
356                        };
357                        let call_executor: crate::workload::HostAbiFn =
358                            std::sync::Arc::new(move |pending| {
359                                let ctx = ctx_for_executor.clone();
360                                Box::pin(async move {
361                                    stream_callback_host_operation_handler(ctx, pending).await
362                                })
363                            });
364                        let mut guard = workload_dispatch.lock().await;
365                        guard
366                            .dispatch_data_stream(chunk, sender, invocation, &call_executor)
367                            .await
368                    })
369                })
370                .await
371            {
372                Ok(()) => HostOperationResult::Done,
373                Err(e) => {
374                    tracing::error!("register_stream failed: {e:?}");
375                    HostOperationResult::Error(actr_error_to_code(&e))
376                }
377            }
378        }
379
380        HostOperation::UnregisterStream(req) => match ctx.unregister_stream(&req.stream_id).await {
381            Ok(()) => HostOperationResult::Done,
382            Err(e) => {
383                tracing::error!("unregister_stream failed: {e:?}");
384                HostOperationResult::Error(actr_error_to_code(&e))
385            }
386        },
387
388        HostOperation::SendDataStream(req) => {
389            let dest = match decode_dest(&req.dest) {
390                Some(d) => d,
391                None => {
392                    tracing::error!("send_data_stream: dest decode failed");
393                    return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
394                }
395            };
396            let payload_type = match PayloadType::try_from(req.payload_type) {
397                Ok(PayloadType::StreamReliable | PayloadType::StreamLatencyFirst) => {
398                    PayloadType::try_from(req.payload_type).expect("checked payload type")
399                }
400                Ok(other) => {
401                    tracing::error!(?other, "send_data_stream: invalid stream payload type");
402                    return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
403                }
404                Err(_) => {
405                    tracing::error!(
406                        payload_type = req.payload_type,
407                        "send_data_stream: unknown payload type"
408                    );
409                    return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
410                }
411            };
412            match ctx.send_data_stream(&dest, req.chunk, payload_type).await {
413                Ok(()) => HostOperationResult::Done,
414                Err(e) => {
415                    tracing::error!("send_data_stream failed: {e:?}");
416                    HostOperationResult::Error(actr_error_to_code(&e))
417                }
418            }
419        }
420    }
421}
422
423fn lifecycle_invocation(
424    actor_id: &ActrId,
425    request_id: &'static str,
426) -> crate::workload::InvocationContext {
427    crate::workload::InvocationContext {
428        self_id: actor_id.clone(),
429        caller_id: None,
430        request_id: request_id.to_string(),
431    }
432}
433
434pub(crate) fn lifecycle_host_abi(
435    ctx: crate::context::RuntimeContext,
436    workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
437) -> crate::workload::HostAbiFn {
438    std::sync::Arc::new(move |pending| {
439        let ctx = ctx.clone();
440        let workload_dispatch = workload_dispatch.clone();
441        Box::pin(async move { host_operation_handler(ctx, workload_dispatch, pending).await })
442    })
443}
444
445async fn stream_callback_host_operation_handler(
446    ctx: crate::context::RuntimeContext,
447    pending: crate::workload::HostOperation,
448) -> crate::workload::HostOperationResult {
449    use crate::workload::{HostOperation, HostOperationResult, decode_dest};
450    use actr_framework::guest::dynclib_abi::code as abi_code;
451    use actr_framework::{Context as _, Dest};
452    use actr_protocol::PayloadType;
453
454    fn actr_error_to_code(err: &ActrError) -> i32 {
455        match err {
456            ActrError::DecodeFailure(_) | ActrError::InvalidArgument(_) => abi_code::PROTOCOL_ERROR,
457            _ => abi_code::GENERIC_ERROR,
458        }
459    }
460
461    match pending {
462        HostOperation::CallRaw(req) => {
463            match ctx
464                .call_raw(
465                    &Dest::Actor(req.target),
466                    req.route_key,
467                    PayloadType::RpcReliable,
468                    bytes::Bytes::from(req.payload),
469                    30_000,
470                )
471                .await
472            {
473                Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
474                Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
475            }
476        }
477        HostOperation::Call(req) => {
478            let dest = match decode_dest(&req.dest) {
479                Some(d) => d,
480                None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
481            };
482            match ctx
483                .call_raw(
484                    &dest,
485                    req.route_key,
486                    PayloadType::RpcReliable,
487                    bytes::Bytes::from(req.payload),
488                    30_000,
489                )
490                .await
491            {
492                Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
493                Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
494            }
495        }
496        HostOperation::Tell(req) => {
497            let dest = match decode_dest(&req.dest) {
498                Some(d) => d,
499                None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
500            };
501            match ctx
502                .tell_raw(
503                    &dest,
504                    req.route_key,
505                    PayloadType::RpcReliable,
506                    bytes::Bytes::from(req.payload),
507                )
508                .await
509            {
510                Ok(()) => HostOperationResult::Done,
511                Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
512            }
513        }
514        HostOperation::Discover(req) => {
515            match ctx.discover_route_candidate(&req.target_type).await {
516                Ok(id) => HostOperationResult::Bytes(id.encode_to_vec()),
517                Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
518            }
519        }
520        HostOperation::RegisterStream(_) => {
521            tracing::error!("register_stream from inside a stream callback is not supported");
522            HostOperationResult::Error(abi_code::UNSUPPORTED_OP)
523        }
524        HostOperation::UnregisterStream(req) => match ctx.unregister_stream(&req.stream_id).await {
525            Ok(()) => HostOperationResult::Done,
526            Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
527        },
528        HostOperation::SendDataStream(req) => {
529            let dest = match decode_dest(&req.dest) {
530                Some(d) => d,
531                None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
532            };
533            let payload_type = match PayloadType::try_from(req.payload_type) {
534                Ok(PayloadType::StreamReliable | PayloadType::StreamLatencyFirst) => {
535                    PayloadType::try_from(req.payload_type).expect("checked payload type")
536                }
537                Ok(_) | Err(_) => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
538            };
539            match ctx.send_data_stream(&dest, req.chunk, payload_type).await {
540                Ok(()) => HostOperationResult::Done,
541                Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
542            }
543        }
544    }
545}
546
547/// Map ActrError to error code for ErrorResponse
548fn protocol_error_to_code(err: &ActrError) -> u32 {
549    match err {
550        ActrError::Unavailable(_) => 503,            // Service Unavailable
551        ActrError::TimedOut => 504,                  // Gateway Timeout
552        ActrError::NotFound(_) => 404,               // Not Found
553        ActrError::PermissionDenied(_) => 403,       // Forbidden
554        ActrError::InvalidArgument(_) => 400,        // Bad Request
555        ActrError::UnknownRoute(_) => 404,           // Not Found - route not found
556        ActrError::DependencyNotFound { .. } => 400, // Bad Request
557        ActrError::DecodeFailure(_) => 400,          // Bad Request - decode failure
558        ActrError::NotImplemented(_) => 501,         // Not Implemented
559        ActrError::Internal(_) => 500,               // Internal Server Error
560    }
561}
562
563impl Inner {
564    #[allow(dead_code)]
565    pub(crate) fn package_manifest(&self) -> Option<&actr_pack::PackageManifest> {
566        self.package_manifest.as_ref()
567    }
568
569    /// Network event processing loop (background task)
570    ///
571    /// # Responsibilities
572    /// - Receive network events from Channel
573    /// - Delegate to NetworkEventProcessor for handling
574    /// - Record processing time and send results
575    async fn network_event_loop(
576        event_rx: tokio::sync::mpsc::Receiver<crate::lifecycle::network_event::NetworkEvent>,
577        result_tx: tokio::sync::mpsc::Sender<crate::lifecycle::network_event::NetworkEventResult>,
578        event_processor: Arc<dyn crate::lifecycle::network_event::NetworkEventProcessor>,
579        shutdown_token: CancellationToken,
580    ) {
581        crate::lifecycle::network_event::run_network_event_reconciler(
582            event_rx,
583            result_tx,
584            event_processor,
585            shutdown_token,
586        )
587        .await;
588    }
589
590    fn duplicate_wait_timeout(timeout_ms: i64) -> Duration {
591        if timeout_ms > 0 {
592            Duration::from_millis(timeout_ms as u64)
593        } else {
594            DEDUP_TTL
595        }
596    }
597
598    async fn wait_for_inflight_duplicate(
599        mut waiter: DedupWaiter,
600        timeout: Duration,
601    ) -> ActorResult<Bytes> {
602        let wait_for_result = async {
603            loop {
604                if let Some(result) = waiter.borrow().clone() {
605                    return result;
606                }
607
608                if waiter.changed().await.is_err() {
609                    if let Some(result) = waiter.borrow().clone() {
610                        return result;
611                    }
612                    return Err(ActrError::Unavailable(
613                        "duplicate request result unavailable".to_string(),
614                    ));
615                }
616            }
617        };
618
619        match tokio::time::timeout(timeout, wait_for_result).await {
620            Ok(result) => result,
621            Err(_) => Err(ActrError::Unavailable(format!(
622                "duplicate request in-flight timed out after {}ms",
623                timeout.as_millis()
624            ))),
625        }
626    }
627
628    /// - Single-hop calls: effectively identical
629    /// - Multi-hop calls: trace_id spans all hops, request_id per hop
630    #[cfg_attr(
631        feature = "opentelemetry",
632        tracing::instrument(
633            skip_all,
634            name = "ActrNode.handle_incoming",
635            fields(
636                actr_id = %self.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default(),
637                route_key = %envelope.route_key,
638                request_id = %envelope.request_id,
639            )
640        )
641    )]
642    pub async fn handle_incoming(
643        &self,
644        envelope: RpcEnvelope,
645        caller_id: Option<&ActrId>,
646    ) -> ActorResult<Bytes> {
647        // Log received message
648        if let Some(caller) = caller_id {
649            tracing::debug!(
650                "📨 Handling incoming message: route_key={}, caller={}, request_id={}",
651                envelope.route_key,
652                caller,
653                envelope.request_id
654            );
655        } else {
656            tracing::debug!(
657                "📨 Handling incoming message: route_key={}, request_id={}",
658                envelope.route_key,
659                envelope.request_id
660            );
661        }
662
663        // 0. Get actor_id early for ACL check
664        let actor_id = self.actor_id.as_ref().ok_or_else(|| {
665            ActrError::Internal(
666                "Actor ID not set - node must be started before handling messages".to_string(),
667            )
668        })?;
669
670        // 0.1. ACL Permission Check (before processing message)
671        let acl_allowed = check_acl_permission(caller_id, actor_id, self.config.acl.as_ref())
672            .map_err(|err_msg| ActrError::Internal(format!("ACL check failed: {}", err_msg)))?;
673
674        if !acl_allowed {
675            tracing::warn!(
676                severity = 5,
677                error_category = "acl_denied",
678                request_id = %envelope.request_id,
679                route_key = %envelope.route_key,
680                caller = %caller_id
681                    .map(|c| c.to_string())
682                    .unwrap_or_else(|| "<none>".to_string()),
683                "🚫 ACL: Permission denied"
684            );
685
686            return Err(ActrError::PermissionDenied(format!(
687                "ACL denied: {} is not allowed to call {}",
688                caller_id
689                    .map(|c| c.to_string())
690                    .unwrap_or_else(|| "<unknown>".to_string()),
691                actor_id
692            )));
693        }
694
695        // 0.2. Deduplication: return cached response for retried request_ids
696        let outcome = {
697            self.dedup_state
698                .lock()
699                .await
700                .check_or_mark(&envelope.request_id)
701        };
702        match outcome {
703            DedupOutcome::Fresh => {} // proceed normally
704            DedupOutcome::InFlight(waiter) => {
705                tracing::debug!(
706                    request_id = %envelope.request_id,
707                    route_key = %envelope.route_key,
708                    "duplicate request in-flight; waiting for original result"
709                );
710                return Self::wait_for_inflight_duplicate(
711                    waiter,
712                    Self::duplicate_wait_timeout(envelope.timeout_ms),
713                )
714                .await;
715            }
716            DedupOutcome::Duplicate(cached) => {
717                tracing::debug!(
718                    request_id = %envelope.request_id,
719                    route_key = %envelope.route_key,
720                    "♻️ returning cached response for duplicate request_id"
721                );
722                return cached;
723            }
724        }
725
726        // 1. Create Context with caller_id from transport layer
727        let credential_state = self.credential_state.clone().ok_or_else(|| {
728            ActrError::Internal(
729                "Credential not set - node must be started before handling messages".to_string(),
730            )
731        })?;
732        let ctx = self.make_runtime_context(
733            actor_id,
734            caller_id, // caller_id from transport layer (MessageRecord.from)
735            &envelope.request_id,
736            &credential_state.credential().await,
737        );
738
739        // 2. Dispatch
740        let dispatch_ctx = crate::workload::InvocationContext {
741            self_id: actor_id.clone(),
742            caller_id: caller_id.cloned(),
743            request_id: envelope.request_id.clone(),
744        };
745        let ctx_for_executor = ctx.clone();
746        let workload_for_executor = self.workload_dispatch.clone();
747        let call_executor: crate::workload::HostAbiFn = std::sync::Arc::new(move |pending| {
748            let ctx = ctx_for_executor.clone();
749            let workload_dispatch = workload_for_executor.clone();
750            Box::pin(async move { host_operation_handler(ctx, workload_dispatch, pending).await })
751        });
752
753        let mut guard = self.workload_dispatch.lock().await;
754        let result = guard
755            .dispatch_envelope(envelope.clone(), ctx.clone(), dispatch_ctx, &call_executor)
756            .await
757            .map_err(|e| ActrError::Internal(format!("workload dispatch failed: {e:?}")));
758
759        match &result {
760            Ok(_) => tracing::debug!(
761                request_id = %envelope.request_id,
762                route_key = %envelope.route_key,
763                "✅ Message handled successfully"
764            ),
765            Err(e) => tracing::error!(
766                severity = 6,
767                error_category = "handler_error",
768                request_id = %envelope.request_id,
769                route_key = %envelope.route_key,
770                "❌ Message handling failed: {:?}", e
771            ),
772        }
773
774        // 3. Store completed result in dedup cache before returning
775        self.dedup_state
776            .lock()
777            .await
778            .complete(&envelope.request_id, result.clone());
779
780        result
781    }
782
783    /// Build a new `Inner` from config and runtime workload.
784    ///
785    /// This is the internal constructor behind the public node builders and
786    /// Hyper package attach helpers.
787    pub(crate) async fn build(
788        config: actr_config::RuntimeConfig,
789        workload: crate::workload::Workload,
790        package_manifest: Option<actr_pack::PackageManifest>,
791        packaged_lock: Option<actr_config::lock::LockFile>,
792        mailbox_backpressure_threshold: usize,
793        credential_expiry_warning: Duration,
794    ) -> ActorResult<Self> {
795        use crate::outbound::{Gate, HostGate};
796        use crate::wire::webrtc::{ReconnectConfig, SignalingConfig, WebSocketSignalingClient};
797
798        tracing::info!("🚀 Initializing ActrNode");
799
800        // Initialize Mailbox
801        let mailbox_path = config
802            .mailbox_path
803            .as_ref()
804            .map(|p| p.to_string_lossy().to_string())
805            .unwrap_or_else(|| ":memory:".to_string());
806
807        tracing::info!("📂 Mailbox database path: {}", mailbox_path);
808
809        let mailbox: Arc<dyn actr_runtime_mailbox::Mailbox> = Arc::new(
810            actr_runtime_mailbox::SqliteMailbox::new(&mailbox_path)
811                .await
812                .map_err(|e| {
813                    actr_protocol::ActrError::Unavailable(format!("Mailbox init failed: {e}"))
814                })?,
815        );
816
817        // Initialize Dead Letter Queue
818        let dlq_path = if mailbox_path == ":memory:" {
819            ":memory:".to_string()
820        } else {
821            format!("{mailbox_path}.dlq")
822        };
823
824        let dlq: Arc<dyn actr_runtime_mailbox::DeadLetterQueue> = Arc::new(
825            actr_runtime_mailbox::SqliteDeadLetterQueue::new_standalone(&dlq_path)
826                .await
827                .map_err(|e| {
828                    actr_protocol::ActrError::Unavailable(format!("DLQ init failed: {e}"))
829                })?,
830        );
831        tracing::info!("✅ Dead Letter Queue initialized");
832
833        // Initialize signaling client
834        let webrtc_role = if config.webrtc.advanced.prefer_answerer() {
835            Some("answer".to_string())
836        } else {
837            None
838        };
839
840        let signaling_config = SignalingConfig {
841            server_url: config.signaling_url.clone(),
842            connection_timeout: 30,
843            heartbeat_interval: 30,
844            reconnect_config: ReconnectConfig::default(),
845            auth_config: None,
846            webrtc_role,
847        };
848
849        let client = Arc::new(WebSocketSignalingClient::new(signaling_config));
850        client.start_reconnect_manager();
851        let signaling_client: Arc<dyn crate::wire::webrtc::SignalingClient> = client;
852
853        // Initialize inproc infrastructure (Shell ↔ Guest)
854        let shell_to_workload = Arc::new(HostTransport::new());
855        let workload_to_shell = Arc::new(HostTransport::new());
856        let inproc_gate = Gate::Host(Arc::new(HostGate::new(shell_to_workload.clone())));
857
858        let data_stream_registry = Arc::new(DataStreamRegistry::new());
859        let media_frame_registry = Arc::new(MediaFrameRegistry::new());
860
861        tracing::info!("✅ Inproc infrastructure initialized (bidirectional Shell ↔ Guest)");
862
863        let actr_lock = if let Some(lock) = packaged_lock {
864            tracing::info!(
865                "📋 Loaded packaged manifest.lock.toml with {} dependencies",
866                lock.dependencies.len()
867            );
868            Some(Arc::new(lock))
869        } else {
870            tracing::warn!(
871                "⚠️ manifest.lock.toml not found in package. Continuing without dependency fingerprints."
872            );
873            None
874        };
875
876        tracing::info!("✅ ActrNode initialized");
877
878        Ok(Self {
879            config,
880            mailbox,
881            dlq,
882            inproc_gate,
883            outproc_gate: None, // Populated in start() once WebRTC / PeerGate is ready.
884            data_stream_registry,
885            media_frame_registry,
886            signaling_client,
887            actor_id: None,
888            credential_state: None,
889            webrtc_coordinator: None,
890            webrtc_gate: None,
891            websocket_gate: None,
892            shell_to_workload: Some(shell_to_workload),
893            workload_to_shell: Some(workload_to_shell),
894            shutdown_token: CancellationToken::new(),
895            actr_lock,
896            network_event_rx: None,
897            network_event_result_tx: None,
898            network_event_debounce_config: None,
899            dedup_state: Arc::new(Mutex::new(DedupState::new())),
900            package_manifest,
901            preregistered_credential: None,
902            discovered_ws_addresses: Arc::new(tokio::sync::RwLock::new(
903                std::collections::HashMap::new(),
904            )),
905            workload_dispatch: Arc::new(Mutex::new(workload)),
906            hook_observer: None,
907            mailbox_backpressure_threshold,
908            credential_expiry_warning,
909        })
910    }
911
912    /// Snapshot the current runtime handles into a `BootstrapContextBuilder`.
913    ///
914    /// The returned builder is cloned into long-lived hook closures and into
915    /// `ActrRefShared` so those paths can materialize bootstrap contexts
916    /// without retaining a reference back to `Inner`. The snapshot freezes
917    /// `outproc_gate` and `actr_lock` at call time — callers that want to
918    /// observe a later-initialized `outproc_gate` must rebuild.
919    pub(crate) fn bootstrap_ctx_builder(&self) -> BootstrapContextBuilder {
920        BootstrapContextBuilder::new(
921            self.inproc_gate.clone(),
922            self.outproc_gate.clone(),
923            self.data_stream_registry.clone(),
924            self.media_frame_registry.clone(),
925            self.signaling_client.clone(),
926            self.actr_lock.clone(),
927        )
928    }
929
930    /// Build a `RuntimeContext` for the per-request dispatch path.
931    ///
932    /// Unlike `BootstrapContextBuilder::build_bootstrap`, this carries the
933    /// envelope's caller identity and request id through into the context.
934    pub(crate) fn make_runtime_context(
935        &self,
936        self_id: &ActrId,
937        caller_id: Option<&ActrId>,
938        request_id: &str,
939        credential: &AIdCredential,
940    ) -> RuntimeContext {
941        RuntimeContext::new(
942            self_id.clone(),
943            caller_id.cloned(),
944            request_id.to_string(),
945            self.inproc_gate.clone(),
946            self.outproc_gate.clone(),
947            self.data_stream_registry.clone(),
948            self.media_frame_registry.clone(),
949            self.signaling_client.clone(),
950            credential.clone(),
951            self.actr_lock.clone(),
952        )
953    }
954
955    /// Create network event processing infrastructure (called on demand, before `start()`).
956    ///
957    /// # Parameters
958    /// - `debounce_ms`: Debounce window in milliseconds. If 0, no debounce.
959    ///
960    /// # Panics
961    /// Panics if called more than once.
962    pub fn create_network_event_handle(
963        &mut self,
964        debounce_ms: u64,
965    ) -> crate::lifecycle::NetworkEventHandle {
966        if self.network_event_rx.is_some() {
967            panic!("create_network_event_handle() can only be called once");
968        }
969
970        let (event_tx, event_rx) = tokio::sync::mpsc::channel(100);
971        let (result_tx, result_rx) = tokio::sync::mpsc::channel(100);
972
973        let debounce_config = if debounce_ms > 0 {
974            Some(crate::lifecycle::network_event::DebounceConfig {
975                window: std::time::Duration::from_millis(debounce_ms),
976            })
977        } else {
978            None
979        };
980
981        self.network_event_rx = Some(event_rx);
982        self.network_event_result_tx = Some(result_tx);
983        self.network_event_debounce_config = debounce_config;
984
985        crate::lifecycle::NetworkEventHandle::new(event_tx, result_rx)
986    }
987
988    /// Attach a credential already issued by AIS so that `start()` can skip
989    /// the signaling registration step.
990    ///
991    /// Called by the Hyper layer between `Hyper::register()` and `Hyper::start()`.
992    pub fn set_preregistered_credential(&mut self, register_ok: register_response::RegisterOk) {
993        tracing::debug!("Pre-registered credential attached; start() will skip AIS registration");
994        self.preregistered_credential = Some(register_ok);
995    }
996
997    /// Start the system
998    pub async fn start(mut self) -> ActorResult<ActrRef> {
999        tracing::info!("🚀 Starting ActrNode");
1000        tracing::info!("Actr Rust version: {}", env!("CARGO_PKG_VERSION"));
1001
1002        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1003        // 1. Build RegisterRequest
1004        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1005        // Get ActrType from configuration
1006        let actr_type = self.config.actr_type().clone();
1007        tracing::info!("📋 Actor type: {}", actr_type);
1008
1009        // ServiceSpec is derived by the Hyper layer from the verified package
1010        // (see `service_spec::calculate_service_spec_from_package`). The raw
1011        // ActrNode::start() path has no package context and always sends None
1012        // on its own RegisterRequest; callers that need a spec must go
1013        // through `Hyper::register()`.
1014        let service_spec = None;
1015
1016        // If a WebSocket listen port is configured, build the advertised ws:// address
1017        // to register with the signaling server so clients can discover it.
1018        let ws_address = if let Some(port) = self.config.websocket_listen_port {
1019            let host = self
1020                .config
1021                .websocket_advertised_host
1022                .as_deref()
1023                .unwrap_or("127.0.0.1");
1024            Some(format!("ws://{}:{}", host, port))
1025        } else {
1026            None
1027        };
1028
1029        if let Some(ref addr) = ws_address {
1030            tracing::info!(
1031                "📡 Advertising WebSocket address to signaling server: {}",
1032                addr
1033            );
1034        }
1035
1036        let register_request = RegisterRequest {
1037            actr_type: actr_type.clone(),
1038            realm: self.config.realm,
1039            service_spec,
1040            acl: self.config.acl.clone(),
1041            service: None,
1042            ws_address,
1043            auth_mode: Some(RegisterAuthMode::Linked as i32),
1044            ..Default::default()
1045        };
1046
1047        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1048        // 1. Obtain registration info (Hyper pre-injected or AIS HTTP)
1049        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1050        let register_ok = if let Some(injected) = self.preregistered_credential.take() {
1051            tracing::info!(
1052                "Using Hyper pre-injected registration credential; skipping AIS registration"
1053            );
1054            injected
1055        } else {
1056            let ais_endpoint = &self.config.ais_endpoint;
1057            tracing::info!(
1058                ais_endpoint = %ais_endpoint,
1059                "Registering actor with AIS via HTTP"
1060            );
1061            let mut ais = AisClient::new(ais_endpoint);
1062            if let Some(ref secret) = self.config.realm_secret {
1063                ais = ais.with_realm_secret(secret);
1064            }
1065            let resp = ais
1066                .register_linked(register_request.clone())
1067                .await
1068                .map_err(|e| ActrError::Unavailable(format!("AIS registration failed: {e}")))?;
1069            match resp.result {
1070                Some(register_response::Result::Success(ok)) => {
1071                    tracing::info!("✅ AIS HTTP registration successful");
1072                    ok
1073                }
1074                Some(register_response::Result::Error(error)) => {
1075                    tracing::error!(
1076                        severity = 10,
1077                        error_category = "registration_error",
1078                        error_code = error.code,
1079                        "❌ AIS registration failed: code={}, message={}",
1080                        error.code,
1081                        error.message
1082                    );
1083                    return Err(ActrError::Unavailable(format!(
1084                        "AIS registration rejected: {} (code: {})",
1085                        error.message, error.code
1086                    )));
1087                }
1088                None => {
1089                    tracing::error!(
1090                        severity = 10,
1091                        error_category = "registration_error",
1092                        "❌ AIS registration response missing result"
1093                    );
1094                    return Err(ActrError::Unavailable(
1095                        "Invalid AIS registration response: missing result".to_string(),
1096                    ));
1097                }
1098            }
1099        };
1100
1101        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1102        // 3. Set credential on signaling client, then connect signaling WS
1103        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1104        // The signaling server requires credential params in the WS URL for
1105        // authentication. We must set actor_id + credential BEFORE connecting
1106        // so that build_url_with_identity() includes them in the query string.
1107        let pre_connect_credential_state = {
1108            let actor_id = register_ok.actr_id.clone();
1109            let credential_state = CredentialState::new(
1110                register_ok.credential.clone(),
1111                register_ok.credential_expires_at,
1112                Some(register_ok.turn_credential.clone()),
1113            );
1114            self.signaling_client.set_actor_id(actor_id).await;
1115            self.signaling_client
1116                .set_credential_state(credential_state.clone())
1117                .await;
1118            credential_state
1119        };
1120
1121        // Install the signaling-side hook callback so that
1122        // SignalingConnectStart / Connected / Disconnected events flow
1123        // through the framework tracing defaults and into a
1124        // user-installed observer. Done BEFORE connect() so the initial
1125        // attempt produces a SignalingConnectStart event.
1126        {
1127            let actor_id = register_ok.actr_id.clone();
1128            let credential_state = pre_connect_credential_state.clone();
1129            // Snapshot at this point — outproc_gate is still None here, so
1130            // signaling-event contexts will carry None for outproc_gate
1131            // (matching the pre-existing behavior prior to B13 refactor).
1132            let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1133            let ctx_builder: crate::lifecycle::hooks::HookContextBuilder = Arc::new(move || {
1134                let snapshot = ctx_builder_snapshot.clone();
1135                let actor_id = actor_id.clone();
1136                let credential_state = credential_state.clone();
1137                Box::pin(async move {
1138                    Some(snapshot.build_bootstrap(&actor_id, &credential_state.credential().await))
1139                })
1140            });
1141            let cb = crate::lifecycle::hooks::build_hook_callback(
1142                self.hook_observer.clone(),
1143                ctx_builder,
1144            );
1145            self.signaling_client.set_hook_callback(cb);
1146        }
1147
1148        tracing::info!("📡 Connecting to signaling server (with credential)");
1149        self.signaling_client
1150            .connect()
1151            .await
1152            .map_err(|e| ActrError::Unavailable(format!("Signaling connect failed: {e}")))?;
1153        tracing::info!("✅ Connected to signaling server");
1154
1155        // Collect background task handles so they can be managed by ActrRefShared later.
1156        let mut task_handles = Vec::new();
1157
1158        // Node-level hook callback, built inside the registration
1159        // setup block below and published back out into this wider
1160        // scope so the mailbox backpressure watchdog can subscribe.
1161        let node_hook_callback: Option<crate::wire::webrtc::HookCallback>;
1162
1163        {
1164            let actor_id = register_ok.actr_id;
1165            let credential = register_ok.credential;
1166
1167            tracing::info!("🆔 Assigned ActrId: {}", actor_id);
1168            tracing::info!("🔐 Received credential (key_id: {})", credential.key_id);
1169            tracing::info!(
1170                "💓 Signaling heartbeat interval: {} seconds",
1171                register_ok.signaling_heartbeat_interval_secs
1172            );
1173
1174            // TurnCredential is a required field; should always be present under normal registration.
1175            tracing::debug!("TurnCredential received, TURN authentication ready");
1176
1177            if let Some(expires_at) = &register_ok.credential_expires_at {
1178                tracing::debug!("⏰ Credential expires at: {}s", expires_at.seconds);
1179            }
1180
1181            // Store ActrId and credential state
1182            self.actor_id = Some(actor_id.clone());
1183            let credential_state = CredentialState::new(
1184                credential,
1185                register_ok.credential_expires_at,
1186                Some(register_ok.turn_credential.clone()),
1187            );
1188            self.credential_state = Some(credential_state.clone());
1189
1190            // Build the node-level lifecycle hook callback once: it is
1191            // reused for the initial `on_credential_renewed`, handed to
1192            // the heartbeat task for subsequent credential events, and
1193            // handed to the mailbox backpressure watchdog for
1194            // `on_mailbox_backpressure` on rising-edge crossings.
1195            //
1196            // The signaling layer already has its own callback installed
1197            // above — this second callback only carries credential and
1198            // mailbox-backpressure events, so no overlap with the
1199            // signaling-event plumbing.
1200            node_hook_callback =
1201                {
1202                    let actor_id_for_hook = actor_id.clone();
1203                    let credential_state_for_hook = credential_state.clone();
1204                    // Snapshot at this point — outproc_gate is still None
1205                    // here; credential / mailbox hook contexts inherit that
1206                    // and therefore cannot issue Dest::Actor(_) calls (same
1207                    // behavior as before B13 refactor).
1208                    let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1209                    let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1210                        Arc::new(move || {
1211                            let snapshot = ctx_builder_snapshot.clone();
1212                            let actor_id = actor_id_for_hook.clone();
1213                            let credential_state = credential_state_for_hook.clone();
1214                            Box::pin(async move {
1215                                Some(snapshot.build_bootstrap(
1216                                    &actor_id,
1217                                    &credential_state.credential().await,
1218                                ))
1219                            })
1220                        });
1221                    Some(crate::lifecycle::hooks::build_hook_callback(
1222                        self.hook_observer.clone(),
1223                        ctx_builder,
1224                    ))
1225                };
1226
1227            // Fire `on_credential_renewed` at initial registration: the
1228            // credential is considered "renewed" from "nothing" to the
1229            // value just issued by AIS. Subsequent renewals fire the
1230            // same hook from `lifecycle::heartbeat`.
1231            if let Some(expires_at) = &register_ok.credential_expires_at {
1232                let new_expiry = std::time::UNIX_EPOCH
1233                    + std::time::Duration::from_secs(expires_at.seconds.max(0) as u64);
1234                if let Some(cb) = node_hook_callback.as_ref() {
1235                    cb(crate::wire::webrtc::HookEvent::CredentialRenewed { new_expiry }).await;
1236                } else {
1237                    tracing::info!(new_expiry = ?new_expiry, "credential renewed");
1238                }
1239            }
1240
1241            // Note: actor_id and credential_state were already set on signaling_client
1242            // before connect (step 3 above), so reconnect URLs already carry correct auth.
1243
1244            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1245            // 1.3. Inproc transports were filled in during `build()`; nothing
1246            //      to stage here now that ContextFactory has been removed.
1247            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1248            tracing::info!("✅ Inproc infrastructure already ready (created in ActrNode::build())");
1249
1250            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1251            // 1.5. Create WebRTC infrastructure
1252            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1253            tracing::info!("🌐 Initializing WebRTC infrastructure");
1254
1255            let media_frame_registry = self.media_frame_registry.clone();
1256
1257            // Create WebRtcCoordinator
1258            let coordinator = Arc::new(crate::wire::webrtc::WebRtcCoordinator::new(
1259                actor_id.clone(),
1260                credential_state.clone(),
1261                self.signaling_client.clone(),
1262                self.config.webrtc.clone(),
1263                media_frame_registry,
1264            ));
1265
1266            // Install the WebRTC hook callback — fires
1267            // WebRtcConnectStart / Connected (with relayed info) /
1268            // Disconnected HookEvents on every peer state change.
1269            {
1270                let actor_id_for_hook = actor_id.clone();
1271                let credential_state_for_hook = credential_state.clone();
1272                // Snapshot before outproc_gate is wired up (just below). This
1273                // preserves the pre-refactor behavior where WebRTC-event
1274                // hook contexts carry outproc_gate = None.
1275                let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1276                let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1277                    Arc::new(move || {
1278                        let snapshot = ctx_builder_snapshot.clone();
1279                        let actor_id = actor_id_for_hook.clone();
1280                        let credential_state = credential_state_for_hook.clone();
1281                        Box::pin(async move {
1282                            Some(
1283                                snapshot.build_bootstrap(
1284                                    &actor_id,
1285                                    &credential_state.credential().await,
1286                                ),
1287                            )
1288                        })
1289                    });
1290                let cb = crate::lifecycle::hooks::build_hook_callback(
1291                    self.hook_observer.clone(),
1292                    ctx_builder,
1293                );
1294                coordinator.set_hook_callback(cb);
1295            }
1296
1297            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1298            // 1.6. Create PeerTransport + PeerGate (new architecture)
1299            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1300            tracing::info!("🏗️  Creating PeerTransport with WebRTC support");
1301
1302            // Create DefaultWireBuilder with WebRTC coordinator
1303            use crate::transport::{DefaultWireBuilder, DefaultWireBuilderConfig};
1304
1305            // WebSocket channel always enabled: target ws:// address is fully discovered at runtime
1306            // Direct-connect mode: encode local node ActrId as hex, sent as X-Actr-Node-Id
1307            let local_id_hex = hex::encode(actor_id.encode_to_vec());
1308            let wire_builder_config = DefaultWireBuilderConfig {
1309                local_id_hex,
1310                enable_webrtc: true,
1311                enable_websocket: true,
1312                // Share the discovered_ws_addresses map so that post-discovery calls
1313                // can use the signaling-provided ws:// URL for this actor node.
1314                discovered_ws_addresses: self.discovered_ws_addresses.clone(),
1315                // Pass credential_state so outbound WS handshake carries X-Actr-Credential,
1316                // enabling peer WebSocketGate to perform Ed25519 signature verification.
1317                credential_state: Some(credential_state.clone()),
1318            };
1319            let wire_builder = Arc::new(DefaultWireBuilder::new(
1320                Some(coordinator.clone()),
1321                wire_builder_config,
1322            ));
1323
1324            // Create PeerTransport
1325            use crate::transport::PeerTransport;
1326            let transport_manager = Arc::new(PeerTransport::new(actor_id.clone(), wire_builder));
1327
1328            // Create PeerGate with WebRTC coordinator for MediaTrack support
1329            use crate::outbound::PeerGate;
1330            let outproc_gate =
1331                Arc::new(PeerGate::new(transport_manager, Some(coordinator.clone())));
1332            let outproc_gate_enum = Gate::Peer(outproc_gate.clone());
1333            tracing::info!("PeerTransport + PeerGate initialized");
1334
1335            let data_stream_registry = self.data_stream_registry.clone();
1336
1337            // Create WebRtcGate with shared pending_requests and DataStreamRegistry
1338            let pending_requests = outproc_gate.get_pending_requests();
1339            let gate = Arc::new(crate::wire::webrtc::gate::WebRtcGate::new(
1340                coordinator.clone(),
1341                pending_requests,
1342                data_stream_registry.clone(),
1343            ));
1344            // Set local_id
1345            gate.set_local_id(actor_id.clone()).await;
1346            tracing::info!(
1347                "✅ WebRtcGate created with shared pending_requests and DataStreamRegistry"
1348            );
1349
1350            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1351            // 1.7. Wire the outproc gate into Inner so subsequent
1352            //      `make_runtime_context` / `bootstrap_ctx_builder` calls
1353            //      observe it. All per-request contexts created by
1354            //      `handle_incoming` go through this field live.
1355            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1356            tracing::info!("🔧 Wiring outproc_gate into node");
1357            self.outproc_gate = Some(outproc_gate_enum);
1358            tracing::info!("✅ Node runtime gates fully initialized (inproc + outproc)");
1359
1360            // Save references
1361            self.webrtc_coordinator = Some(coordinator.clone());
1362            self.webrtc_gate = Some(gate.clone());
1363            tracing::info!("✅ WebRTC infrastructure initialized");
1364
1365            // Fire `on_start` once the runtime context can see the initialized
1366            // gates, before starting request-accepting/background loops. Its
1367            // Err/panic aborts Node::start.
1368            {
1369                let startup_ctx = self
1370                    .bootstrap_ctx_builder()
1371                    .build_bootstrap(&actor_id, &credential_state.credential().await);
1372                let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_start");
1373                let call_executor =
1374                    lifecycle_host_abi(startup_ctx.clone(), self.workload_dispatch.clone());
1375                let mut workload = self.workload_dispatch.lock().await;
1376                crate::lifecycle::hooks::call_lifecycle_hook(
1377                    "on_start",
1378                    workload.on_start(startup_ctx, invocation, &call_executor),
1379                )
1380                .await?;
1381            }
1382
1383            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1384            // 1.7.6. WebSocket Server (direct-connect mode, optional)
1385            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1386            if let Some(listen_port) = self.config.websocket_listen_port {
1387                tracing::info!(
1388                    "🔌 WebSocket direct-connect mode enabled, binding port {}",
1389                    listen_port
1390                );
1391                use crate::key_cache::AisKeyCache;
1392                use crate::wire::websocket::gate::WsAuthContext;
1393                use crate::wire::websocket::{WebSocketGate, WebSocketServer};
1394
1395                // Build AisKeyCache and seed it with the signing key from the registration response
1396                let ais_key_cache = AisKeyCache::new();
1397                if !register_ok.signing_pubkey.is_empty() {
1398                    match ais_key_cache
1399                        .seed(register_ok.signing_key_id, &register_ok.signing_pubkey)
1400                        .await
1401                    {
1402                        Ok(()) => tracing::info!(
1403                            key_id = register_ok.signing_key_id,
1404                            "🔑 AisKeyCache seeded from RegisterOk"
1405                        ),
1406                        Err(e) => tracing::warn!(
1407                            key_id = register_ok.signing_key_id,
1408                            error = ?e,
1409                            "AisKeyCache seed failed; WebSocket will reject all inbound connections"
1410                        ),
1411                    }
1412                } else {
1413                    tracing::warn!(
1414                        "RegisterOk missing signing_pubkey; WebSocket credential verification will degrade"
1415                    );
1416                }
1417
1418                let auth_ctx = WsAuthContext {
1419                    ais_key_cache,
1420                    actor_id: actor_id.clone(),
1421                    credential_state: credential_state.clone(),
1422                    signaling_client: self.signaling_client.clone(),
1423                };
1424
1425                match WebSocketServer::bind(listen_port).await {
1426                    Ok((ws_server, conn_rx)) => {
1427                        ws_server.start(self.shutdown_token.clone());
1428                        let ws_gate = Arc::new(WebSocketGate::new(
1429                            conn_rx,
1430                            outproc_gate.get_pending_requests(),
1431                            data_stream_registry.clone(),
1432                            Some(auth_ctx),
1433                        ));
1434
1435                        // Install the WebSocket peer-lifecycle hook.
1436                        {
1437                            let actor_id_for_hook = actor_id.clone();
1438                            let credential_state_for_hook = credential_state.clone();
1439                            // Snapshot taken after outproc_gate is live: ws
1440                            // peer-lifecycle hook contexts can issue
1441                            // Dest::Actor(_) calls.
1442                            let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1443                            let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1444                                Arc::new(move || {
1445                                    let snapshot = ctx_builder_snapshot.clone();
1446                                    let actor_id = actor_id_for_hook.clone();
1447                                    let credential_state = credential_state_for_hook.clone();
1448                                    Box::pin(async move {
1449                                        Some(snapshot.build_bootstrap(
1450                                            &actor_id,
1451                                            &credential_state.credential().await,
1452                                        ))
1453                                    })
1454                                });
1455                            let cb = crate::lifecycle::hooks::build_hook_callback(
1456                                self.hook_observer.clone(),
1457                                ctx_builder,
1458                            );
1459                            ws_gate.set_hook_callback(cb);
1460                        }
1461
1462                        self.websocket_gate = Some(ws_gate);
1463                        tracing::info!(
1464                            "✅ WebSocketServer + WebSocketGate initialized (credential auth enabled)"
1465                        );
1466                    }
1467                    Err(e) => {
1468                        tracing::error!(
1469                            "❌ Failed to bind WebSocket server on port {}: {:?}",
1470                            listen_port,
1471                            e
1472                        );
1473                    }
1474                }
1475            }
1476
1477            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1478            // 1.7.5. Create shared state for credential management
1479            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1480            // Shared credential state initialized above; reused across tasks
1481
1482            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1483            // 1.8. Spawn heartbeat task (periodic Ping to signaling server)
1484            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1485            {
1486                let shutdown = self.shutdown_token.clone();
1487                let client = self.signaling_client.clone();
1488                let actor_id_for_heartbeat = actor_id.clone();
1489                let credential_state_for_heartbeat = credential_state.clone();
1490                let mailbox_for_heartbeat = self.mailbox.clone();
1491                let register_request_for_heartbeat = register_request.clone();
1492
1493                // Use interval from registration response, default to 30s
1494                let heartbeat_interval_secs = register_ok.signaling_heartbeat_interval_secs;
1495                let heartbeat_interval = if heartbeat_interval_secs > 0 {
1496                    Duration::from_secs(heartbeat_interval_secs as u64)
1497                } else {
1498                    Duration::from_secs(30)
1499                };
1500                let ais_endpoint_for_heartbeat = self.config.ais_endpoint.clone();
1501                let heartbeat_handle = tokio::spawn(crate::lifecycle::heartbeat::heartbeat_task(
1502                    shutdown,
1503                    client,
1504                    actor_id_for_heartbeat,
1505                    credential_state_for_heartbeat,
1506                    mailbox_for_heartbeat,
1507                    heartbeat_interval,
1508                    register_request_for_heartbeat,
1509                    ais_endpoint_for_heartbeat,
1510                    node_hook_callback.clone(),
1511                ));
1512                task_handles.push(heartbeat_handle);
1513            }
1514            tracing::info!(
1515                "✅ Heartbeat task started (interval: {}s)",
1516                register_ok.signaling_heartbeat_interval_secs
1517            );
1518
1519            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1520            // 1.8.5. Spawn network event processing loop
1521            // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1522            if let (Some(event_rx), Some(result_tx)) = (
1523                self.network_event_rx.take(),
1524                self.network_event_result_tx.take(),
1525            ) {
1526                use crate::lifecycle::network_event::DefaultNetworkEventProcessor;
1527
1528                // Create DefaultNetworkEventProcessor
1529                // If debounce config exists, use new_with_debounce
1530                let event_processor =
1531                    if let Some(config) = self.network_event_debounce_config.clone() {
1532                        Arc::new(DefaultNetworkEventProcessor::new_with_debounce(
1533                            self.signaling_client.clone(),
1534                            self.webrtc_coordinator.clone(),
1535                            config,
1536                        ))
1537                    } else {
1538                        Arc::new(DefaultNetworkEventProcessor::new(
1539                            self.signaling_client.clone(),
1540                            self.webrtc_coordinator.clone(),
1541                        ))
1542                    };
1543
1544                let shutdown = self.shutdown_token.clone();
1545                let network_event_handle = tokio::spawn(async move {
1546                    Self::network_event_loop(event_rx, result_tx, event_processor, shutdown).await;
1547                });
1548                task_handles.push(network_event_handle);
1549                tracing::info!("✅ Network event loop started");
1550            }
1551
1552            {
1553                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1554                // 1.9. Spawn dedicated Unregister task (best-effort, with timeout)
1555                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1556                //
1557                // This task:
1558                // - Waits for shutdown_token to be cancelled (e.g., wait_for_ctrl_c_and_shutdown)
1559                // - Then sends UnregisterRequest via signaling client with a timeout
1560                //
1561                // NOTE: we push its JoinHandle into task_handles so it can be aborted
1562                // by ActrRefShared::Drop if needed.
1563                let shutdown = self.shutdown_token.clone();
1564                let client = self.signaling_client.clone();
1565                let actor_id_for_unreg = actor_id.clone();
1566                let credential_state_for_unreg = credential_state.clone();
1567                let webrtc_coordinator = self.webrtc_coordinator.clone();
1568
1569                let unregister_handle = tokio::spawn(async move {
1570                    // Wait for shutdown signal
1571                    shutdown.cancelled().await;
1572                    tracing::info!(
1573                        "📡 Shutdown signal received, sending UnregisterRequest for Actor {}",
1574                        actor_id_for_unreg
1575                    );
1576
1577                    // 1. Close all WebRTC peer connections first (if any)
1578                    if let Some(coord) = webrtc_coordinator {
1579                        if let Err(e) = coord.close_all_peers().await {
1580                            tracing::warn!(
1581                                "⚠️ Failed to close all WebRTC peers before UnregisterRequest: {}",
1582                                e
1583                            );
1584                        } else {
1585                            tracing::info!("✅ All WebRTC peers closed before UnregisterRequest");
1586                        }
1587                    } else {
1588                        tracing::debug!(
1589                            "WebRTC coordinator not found before UnregisterRequest (no WebRTC?)"
1590                        );
1591                    }
1592
1593                    // 2. Then send UnregisterRequest with a timeout (e.g. 5 seconds)
1594                    let result = tokio::time::timeout(
1595                        Duration::from_secs(5),
1596                        client.send_unregister_request(
1597                            actor_id_for_unreg.clone(),
1598                            credential_state_for_unreg.credential().await,
1599                            Some("Graceful shutdown".to_string()),
1600                        ),
1601                    )
1602                    .await;
1603                    tracing::info!("UnregisterRequest result: {:?}", result);
1604                    match result {
1605                        Ok(Ok(_)) => {
1606                            tracing::info!(
1607                                "✅ UnregisterRequest sent to signaling server for Actor {}",
1608                                actor_id_for_unreg
1609                            );
1610                        }
1611                        Ok(Err(e)) => {
1612                            tracing::warn!(
1613                                "⚠️ Failed to send UnregisterRequest for Actor {}: {}",
1614                                actor_id_for_unreg,
1615                                e
1616                            );
1617                        }
1618                        Err(_) => {
1619                            tracing::warn!(
1620                                "⚠️ UnregisterRequest timeout (5s) for Actor {}",
1621                                actor_id_for_unreg
1622                            );
1623                        }
1624                    }
1625                });
1626
1627                task_handles.push(unregister_handle);
1628            }
1629        } // end registration setup block
1630
1631        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1632        // 2. Transport layer initialization (completed via WebRTC infrastructure)
1633        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1634        tracing::info!("✅ Transport layer initialized via WebRTC infrastructure");
1635
1636        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1637        // 3.1 Convert to Arc (before starting background loops)
1638        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1639        // Clone actor_id before moving self into Arc
1640        let actor_id = self
1641            .actor_id
1642            .as_ref()
1643            .ok_or_else(|| ActrError::Internal("Actor ID not set".to_string()))?
1644            .clone();
1645        // Snapshot now that outproc_gate has been wired above; this builder
1646        // is shared between on_start / on_stop hooks and the ActrRefShared
1647        // handle returned to the caller.
1648        let bootstrap_ctx_builder = self.bootstrap_ctx_builder();
1649        let credential_state = self
1650            .credential_state
1651            .clone()
1652            .expect("CredentialState must be initialized in start()");
1653        let shutdown_token = self.shutdown_token.clone();
1654        let node_ref = Arc::new(self);
1655
1656        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1657        // 3.2. Register workload-level stop hook.
1658        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1659        {
1660            let node = node_ref.clone();
1661            let actor_id = actor_id.clone();
1662            let credential_state = credential_state.clone();
1663            let shutdown = shutdown_token.clone();
1664            let on_stop_handle = tokio::spawn(async move {
1665                shutdown.cancelled().await;
1666                let stop_ctx = node
1667                    .bootstrap_ctx_builder()
1668                    .build_bootstrap(&actor_id, &credential_state.credential().await);
1669                let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_stop");
1670                let call_executor =
1671                    lifecycle_host_abi(stop_ctx.clone(), node.workload_dispatch.clone());
1672                let mut workload = node.workload_dispatch.lock().await;
1673                if let Err(e) = crate::lifecycle::hooks::call_lifecycle_hook(
1674                    "on_stop",
1675                    workload.on_stop(stop_ctx, invocation, &call_executor),
1676                )
1677                .await
1678                {
1679                    tracing::warn!(error = %e, "workload on_stop returned Err");
1680                }
1681            });
1682            task_handles.push(on_stop_handle);
1683        }
1684
1685        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1686        // 3.5. Start WebRTC background loops
1687        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1688        tracing::info!("🚀 Starting WebRTC background loops");
1689
1690        // Start WebRtcCoordinator signaling loop
1691        if let Some(coordinator) = &node_ref.webrtc_coordinator {
1692            coordinator.clone().start().await.map_err(|e| {
1693                ActrError::Unavailable(format!("WebRtcCoordinator start failed: {e}"))
1694            })?;
1695            tracing::info!("✅ WebRtcCoordinator signaling loop started");
1696        }
1697
1698        // Start WebRtcGate message receive loop (route to Mailbox)
1699        if let Some(gate) = &node_ref.webrtc_gate {
1700            gate.start_receive_loop(node_ref.mailbox.clone())
1701                .await
1702                .map_err(|e| {
1703                    ActrError::Unavailable(format!("WebRtcGate receive loop start failed: {e}"))
1704                })?;
1705            tracing::info!("✅ WebRtcGate → Mailbox routing started");
1706        }
1707
1708        // Start WebSocketGate message receive loop (route to Mailbox, direct-connect mode)
1709        if let Some(ws_gate) = &node_ref.websocket_gate {
1710            ws_gate
1711                .start_receive_loop(node_ref.mailbox.clone())
1712                .await
1713                .map_err(|e| {
1714                    ActrError::Unavailable(format!("WebSocketGate receive loop start failed: {e}"))
1715                })?;
1716            tracing::info!("✅ WebSocketGate → Mailbox routing started");
1717        }
1718        tracing::info!("✅ WebRTC background loops started");
1719
1720        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1721        // 4.6. Start Inproc receive loop (Shell → Guest)
1722        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1723        if let Some(shell_to_workload) = &node_ref.shell_to_workload {
1724            tracing::info!("🔄 Starting Inproc receive loop (Shell → Guest)");
1725            // Start Guest receive loop (Shell → Guest REQUEST)
1726            if let Some(workload_to_shell) = &node_ref.workload_to_shell {
1727                let node = node_ref.clone();
1728                let request_rx_lane = shell_to_workload
1729                    .get_lane(PayloadType::RpcReliable, None)
1730                    .await
1731                    .map_err(|e| {
1732                        ActrError::Unavailable(format!("Failed to get guest receive lane: {e}"))
1733                    })?;
1734                let response_tx = workload_to_shell.clone();
1735                let shutdown = shutdown_token.clone();
1736
1737                let inproc_handle = tokio::spawn(async move {
1738                    loop {
1739                        tokio::select! {
1740                            _ = shutdown.cancelled() => {
1741                                tracing::info!("📭 Guest receive loop (Shell → Guest) received shutdown signal");
1742                                break;
1743                            }
1744                            envelope_result = request_rx_lane.recv_envelope() => {
1745                                match envelope_result {
1746                                    Ok(envelope) => {
1747                                        let request_id = envelope.request_id.clone();
1748                                        tracing::debug!("📨 Guest received REQUEST from Shell: request_id={}", request_id);
1749                                        // Extract and set tracing context from envelope
1750                                        #[cfg(feature = "opentelemetry")]
1751                                        let span = {
1752                                            let actr_id_str = node.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default();
1753                                            let span = tracing::info_span!("ActrNode.lane_receive", actr_id = %actr_id_str, request_id = %request_id);
1754                                            set_parent_from_rpc_envelope(&span, &envelope);
1755                                            span
1756                                        };
1757
1758                                        // Shell calls have no caller_id (local process communication)
1759                                        let handle_incoming_fut = node.handle_incoming(envelope.clone(), None);
1760                                        #[cfg(feature = "opentelemetry")]
1761                                        let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
1762
1763                                        match handle_incoming_fut.await {
1764                                            Ok(response_bytes) => {
1765                                                // Send RESPONSE back via workload_to_shell
1766                                                // Keep same route_key (no prefix needed - separate channels!)
1767                                                #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1768                                                let mut response_envelope = RpcEnvelope {
1769                                                    route_key: envelope.route_key.clone(),
1770                                                    payload: Some(response_bytes),
1771                                                    error: None,
1772                                                    traceparent: None,
1773                                                    tracestate: None,
1774                                                    request_id: request_id.clone(),
1775                                                    metadata: Vec::new(),
1776                                                    timeout_ms: 30000,
1777                                                };
1778                                                // Inject tracing context
1779                                                #[cfg(feature = "opentelemetry")]
1780                                                inject_span_context_to_rpc(&span, &mut response_envelope);
1781
1782                                                // Send via Guest → Shell channel
1783                                                let send_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, response_envelope);
1784                                                #[cfg(feature = "opentelemetry")]
1785                                                let send_response_fut = send_response_fut.instrument(span.clone());
1786                                                if let Err(e) = send_response_fut.await {
1787                                                    tracing::error!(
1788                                                        severity = 7,
1789                                                        error_category = "transport_error",
1790                                                        request_id = %request_id,
1791                                                        "❌ Failed to send RESPONSE to Shell: {:?}",
1792                                                        e
1793                                                    );
1794                                                }
1795                                            }
1796                                            Err(e) => {
1797                                                tracing::error!(
1798                                                    severity = 6,
1799                                                    error_category = "handler_error",
1800                                                    request_id = %request_id,
1801                                                    route_key = %envelope.route_key,
1802                                                    "❌ Guest message handling failed: {:?}",
1803                                                    e
1804                                                );
1805
1806                                                // Send error response (system-level error on envelope)
1807                                                let error_response = actr_protocol::ErrorResponse {
1808                                                    code: protocol_error_to_code(&e),
1809                                                    message: e.to_string(),
1810                                                };
1811                                                #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1812                                                let mut error_envelope = RpcEnvelope {
1813                                                    route_key: envelope.route_key.clone(),
1814                                                    payload: None,
1815                                                    error: Some(error_response),
1816                                                    traceparent: envelope.traceparent.clone(),
1817                                                    tracestate: envelope.tracestate.clone(),
1818                                                    request_id: request_id.clone(),
1819                                                    metadata: Vec::new(),
1820                                                    timeout_ms: 30000,
1821                                                };
1822                                                // Inject tracing context
1823                                                #[cfg(feature = "opentelemetry")]
1824                                                inject_span_context_to_rpc(&span, &mut error_envelope);
1825
1826                                                let send_error_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, error_envelope);
1827                                                #[cfg(feature = "opentelemetry")]
1828                                                let send_error_response_fut = send_error_response_fut.instrument(span);
1829                                                if let Err(send_err) = send_error_response_fut.await {
1830                                                    tracing::error!(
1831                                                        severity = 7,
1832                                                        error_category = "transport_error",
1833                                                        request_id = %request_id,
1834                                                        "❌ Failed to send ERROR response to Shell: {:?}",
1835                                                        send_err
1836                                                    );
1837                                                }
1838                                            }
1839                                        }
1840                                    }
1841                                    Err(e) => {
1842                                        tracing::error!(
1843                                            severity = 8,
1844                                            error_category = "transport_error",
1845                                            "❌ Failed to receive from Shell → Guest lane: {:?}",
1846                                            e
1847                                        );
1848                                        break;
1849                                    }
1850                                }
1851                            }
1852                        }
1853                    }
1854                    tracing::info!("✅ Guest receive loop (Shell → Guest) terminated gracefully");
1855                });
1856                task_handles.push(inproc_handle);
1857            }
1858        }
1859        tracing::info!("✅ Guest receive loop (Shell → Guest REQUEST) started");
1860
1861        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1862        // 4.7. Start Shell receive loop (Guest → Shell RESPONSE)
1863        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1864        tracing::info!("🔄 Starting Shell receive loop (Guest → Shell RESPONSE)");
1865        if let Some(workload_to_shell) = &node_ref.workload_to_shell {
1866            // Start Shell receive loop (Guest → Shell RESPONSE)
1867            if let Some(shell_to_workload) = &node_ref.shell_to_workload {
1868                let response_rx_lane = workload_to_shell
1869                    .get_lane(PayloadType::RpcReliable, None)
1870                    .await
1871                    .map_err(|e| {
1872                        ActrError::Unavailable(format!("Failed to get shell receive lane: {e}"))
1873                    })?;
1874                let request_mgr = shell_to_workload.clone();
1875                let shutdown = shutdown_token.clone();
1876
1877                let shell_receive_handle = tokio::spawn(async move {
1878                    loop {
1879                        tokio::select! {
1880                            _ = shutdown.cancelled() => {
1881                                tracing::info!("📭 Shell receive loop (Guest → Shell) received shutdown signal");
1882                                break;
1883                            }
1884                            envelope_result = response_rx_lane.recv_envelope() => {
1885                                match envelope_result {
1886                                    Ok(envelope) => {
1887                                        tracing::debug!(
1888                                            "📨 Shell received RESPONSE from Guest: request_id={}",
1889                                            envelope.request_id
1890                                        );
1891
1892                                        // Check if response is success or error
1893                                        match (envelope.payload, envelope.error) {
1894                                            (Some(payload), None) => {
1895                                                // Success response
1896                                                if let Err(e) = request_mgr
1897                                                    .complete_response(&envelope.request_id, payload)
1898                                                    .await
1899                                                {
1900                                                    tracing::warn!(
1901                                                        severity = 4,
1902                                                        error_category = "orphan_response",
1903                                                        request_id = %envelope.request_id,
1904                                                        "⚠️  No pending request found for response: {:?}",
1905                                                        e
1906                                                    );
1907                                                }
1908                                            }
1909                                            (None, Some(error)) => {
1910                                                // Error response - convert to ActrError and complete with error
1911                                                let actr_err = ActrError::Unavailable(format!("RPC error {}: {}", error.code, error.message));
1912                                                if let Err(e) = request_mgr
1913                                                    .complete_error(&envelope.request_id, actr_err)
1914                                                    .await
1915                                                {
1916                                                    tracing::warn!(
1917                                                        severity = 4,
1918                                                        error_category = "orphan_response",
1919                                                        request_id = %envelope.request_id,
1920                                                        "⚠️  No pending request found for error response: {:?}",
1921                                                        e
1922                                                    );
1923                                                }
1924                                            }
1925                                            _ => {
1926                                                tracing::error!(
1927                                                    severity = 7,
1928                                                    error_category = "protocol_error",
1929                                                    request_id = %envelope.request_id,
1930                                                    "❌ Invalid RpcEnvelope: both payload and error are present or both absent"
1931                                                );
1932                                            }
1933                                        }
1934                                    }
1935                                    Err(e) => {
1936                                        tracing::error!(
1937                                            severity = 8,
1938                                            error_category = "transport_error",
1939                                            "❌ Failed to receive from Guest → Shell lane: {:?}",
1940                                            e
1941                                        );
1942                                        break;
1943                                    }
1944                                }
1945                            }
1946                        }
1947                    }
1948                    tracing::info!("✅ Shell receive loop (Guest → Shell) terminated gracefully");
1949                });
1950                task_handles.push(shell_receive_handle);
1951            }
1952        }
1953        tracing::info!("✅ Shell receive loop (Guest → Shell RESPONSE) started");
1954
1955        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1956        // 4.9. Mailbox backpressure watchdog
1957        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1958        //
1959        // Emits the framework `on_mailbox_backpressure` hook once per
1960        // rising-edge crossing of the configured threshold.
1961        //
1962        // Preferred path: a push-based notification from the mailbox
1963        // backend via [`Mailbox::set_depth_observer`], which runs
1964        // synchronously on every enqueue and has zero worst-case delay.
1965        //
1966        // Fallback path: mailbox backends without depth support (or
1967        // which can't cheaply compute depth on every enqueue) keep
1968        // using a 1 Hz poll of [`Mailbox::status`].
1969        let backpressure_threshold = node_ref.mailbox_backpressure_threshold;
1970        {
1971            use std::sync::atomic::{AtomicBool, Ordering};
1972            let mailbox = node_ref.mailbox.clone();
1973            let shutdown = shutdown_token.clone();
1974            let hook_cb = node_hook_callback.clone();
1975            let triggered = Arc::new(AtomicBool::new(false));
1976
1977            // Shared rising-edge state + hook-firing closure used by
1978            // both the push and polling code paths.
1979            let fire_if_rising = {
1980                let triggered = triggered.clone();
1981                let hook_cb = hook_cb.clone();
1982                Arc::new(move |queue_len: usize| {
1983                    if queue_len >= backpressure_threshold {
1984                        if !triggered.swap(true, Ordering::AcqRel) {
1985                            if let Some(cb) = hook_cb.as_ref() {
1986                                let cb = cb.clone();
1987                                tokio::spawn(async move {
1988                                    cb(crate::wire::webrtc::HookEvent::MailboxBackpressure {
1989                                        queue_len,
1990                                        threshold: backpressure_threshold,
1991                                    })
1992                                    .await;
1993                                });
1994                            } else {
1995                                tracing::warn!(
1996                                    queue_len,
1997                                    threshold = backpressure_threshold,
1998                                    "mailbox backpressure",
1999                                );
2000                            }
2001                        }
2002                    } else if triggered.swap(false, Ordering::AcqRel) {
2003                        tracing::info!(
2004                            queue_len,
2005                            threshold = backpressure_threshold,
2006                            "mailbox backpressure cleared",
2007                        );
2008                    }
2009                })
2010            };
2011
2012            // Try the push path first. The observer installs only if
2013            // the backend supports it; otherwise `installed` is `false`
2014            // and we fall through to polling.
2015            struct EnqueueObserver {
2016                fire: Arc<dyn Fn(usize) + Send + Sync + 'static>,
2017            }
2018            impl actr_runtime_mailbox::MailboxDepthObserver for EnqueueObserver {
2019                fn on_depth_change(&self, queued_messages: usize) {
2020                    (self.fire)(queued_messages);
2021                }
2022            }
2023
2024            let installed = {
2025                let observer: Arc<dyn actr_runtime_mailbox::MailboxDepthObserver> =
2026                    Arc::new(EnqueueObserver {
2027                        fire: fire_if_rising.clone(),
2028                    });
2029                mailbox.set_depth_observer(observer)
2030            };
2031
2032            if installed {
2033                tracing::debug!("mailbox backpressure watchdog: push notifications enabled");
2034            } else {
2035                tracing::debug!(
2036                    "mailbox backpressure watchdog: backend does not support push, falling back to 1 Hz polling"
2037                );
2038                let mailbox_for_poll = mailbox.clone();
2039                let shutdown_for_poll = shutdown.clone();
2040                let fire_for_poll = fire_if_rising.clone();
2041                let watchdog_handle = tokio::spawn(async move {
2042                    let mut ticker = tokio::time::interval(Duration::from_secs(1));
2043                    ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
2044                    loop {
2045                        tokio::select! {
2046                            _ = shutdown_for_poll.cancelled() => {
2047                                tracing::debug!(
2048                                    "mailbox backpressure watchdog shutting down"
2049                                );
2050                                break;
2051                            }
2052                            _ = ticker.tick() => {
2053                                let status = match mailbox_for_poll.status().await {
2054                                    Ok(s) => s,
2055                                    Err(e) => {
2056                                        tracing::debug!(?e, "mailbox status poll failed");
2057                                        continue;
2058                                    }
2059                                };
2060                                fire_for_poll(status.queued_messages as usize);
2061                            }
2062                        }
2063                    }
2064                });
2065                task_handles.push(watchdog_handle);
2066            }
2067        }
2068
2069        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2070        // 5. Start Mailbox processing loop (State Path)
2071        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2072        tracing::info!("🔄 Starting Mailbox processing loop (State Path)");
2073        {
2074            let node = node_ref.clone();
2075            let mailbox = node_ref.mailbox.clone();
2076            let gate = node_ref.webrtc_gate.clone();
2077            let shutdown = shutdown_token.clone();
2078
2079            let mailbox_handle = tokio::spawn(async move {
2080                loop {
2081                    tokio::select! {
2082                        // Listen for shutdown signal
2083                        _ = shutdown.cancelled() => {
2084                            tracing::info!("📭 Mailbox loop received shutdown signal");
2085                            break;
2086                        }
2087                        // Dequeue messages (by priority)
2088                        result = mailbox.dequeue() => {
2089                            match result {
2090                                Ok(messages) => {
2091                                    if messages.is_empty() {
2092                                        // Queue empty, sleep briefly
2093                                        tokio::time::sleep(Duration::from_millis(10)).await;
2094                                        continue;
2095                                    }
2096                                    tracing::debug!("📬 Mailbox dequeue: {} messages", messages.len());
2097
2098                                    // Process messages one by one
2099                                    for msg_record in messages {
2100                                        // Deserialize RpcEnvelope (Protobuf)
2101                                        match RpcEnvelope::decode(&msg_record.payload[..]) {
2102                                            Ok(envelope) => {
2103                                                let request_id = envelope.request_id.clone();
2104                                                let queue_latency_ms = (chrono::Utc::now() - msg_record.created_at).num_milliseconds();
2105                                                tracing::info!(request_id = %request_id, queue_latency_ms = queue_latency_ms, "rpc.mailbox.dequeued");
2106
2107                                                tracing::debug!("📦 Processing message: request_id={}", request_id);
2108                                                #[cfg(feature = "opentelemetry")]
2109                                                let span = {
2110                                                    let actr_id_str = node.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default();
2111                                                    let span = tracing::info_span!("ActrNode.mailbox_receive", actr_id = %actr_id_str, request_id = %request_id, queue_wait_ms = queue_latency_ms);
2112                                                    set_parent_from_rpc_envelope(&span, &envelope);
2113                                                    span
2114                                                };
2115
2116                                                // Decode caller_id from MessageRecord.from (transport layer)
2117                                                let caller_id_result = ActrId::decode(&msg_record.from[..]);
2118                                                let caller_id_ref = caller_id_result.as_ref().ok();
2119
2120                                                if caller_id_ref.is_none() {
2121                                                    tracing::warn!(
2122                                                        request_id = %request_id,
2123                                                        "⚠️  Failed to decode caller_id from MessageRecord.from"
2124                                                    );
2125                                                }
2126
2127                                                // Call handle_incoming with caller_id from transport layer
2128                                                let handle_incoming_fut = node.handle_incoming(envelope.clone(), caller_id_ref);
2129                                                #[cfg(feature = "opentelemetry")]
2130                                                let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
2131
2132                                                match handle_incoming_fut.await {
2133                                                    Ok(response_bytes) => {
2134                                                        // Send response (reuse request_id)
2135                                                        if let Some(ref gate) = gate {
2136                                                            // Use already decoded caller_id
2137                                                            match caller_id_result {
2138                                                                Ok(caller) => {
2139                                                                    // Construct response RpcEnvelope (reuse request_id!)
2140                                                                    #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
2141                                                                    let mut response_envelope = RpcEnvelope {
2142                                                                        request_id, // Reuse!
2143                                                                        route_key: envelope.route_key.clone(),
2144                                                                        payload: Some(response_bytes),
2145                                                                        error: None,
2146                                                                        traceparent: envelope.traceparent.clone(),
2147                                                                        tracestate: envelope.tracestate.clone(),
2148                                                                        metadata: Vec::new(), // Response doesn't need extra metadata
2149                                                                        timeout_ms: 30000,
2150                                                                    };
2151                                                                    // Inject tracing context
2152                                                                    #[cfg(feature = "opentelemetry")]
2153                                                                    inject_span_context_to_rpc(&span, &mut response_envelope);
2154
2155                                                                    let send_response_fut = gate.send_response(&caller, response_envelope);
2156                                                                    #[cfg(feature = "opentelemetry")]
2157                                                                    let send_response_fut = send_response_fut.instrument(span);
2158                                                                    if let Err(e) = send_response_fut.await {
2159                                                                        tracing::error!(
2160                                                                            severity = 7,
2161                                                                            error_category = "transport_error",
2162                                                                            request_id = %envelope.request_id,
2163                                                                            "❌ Failed to send response: {:?}",
2164                                                                            e
2165                                                                        );
2166                                                                    }
2167                                                                }
2168                                                                Err(e) => {
2169                                                                    tracing::error!(
2170                                                                        severity = 8,
2171                                                                        error_category = "protobuf_decode",
2172                                                                        request_id = %envelope.request_id,
2173                                                                        "❌ Failed to decode caller_id: {:?}",
2174                                                                        e
2175                                                                    );
2176                                                                }
2177                                                            }
2178                                                        }
2179
2180                                                        // ACK message
2181                                                        if let Err(e) = mailbox.ack(msg_record.id).await {
2182                                                            tracing::error!(
2183                                                                severity = 9,
2184                                                                error_category = "mailbox_error",
2185                                                                request_id = %envelope.request_id,
2186                                                                message_id = %msg_record.id,
2187                                                                "❌ Mailbox ACK failed: {:?}",
2188                                                                e
2189                                                            );
2190                                                        }
2191                                                    }
2192                                                    Err(e) => {
2193                                                        tracing::error!(
2194                                                            severity = 6,
2195                                                            error_category = "handler_error",
2196                                                            request_id = %envelope.request_id,
2197                                                            route_key = %envelope.route_key,
2198                                                            "❌ handle_incoming failed: {:?}", e
2199                                                        );
2200                                                        // ACK to avoid infinite retries
2201                                                        // Application errors are caller's responsibility
2202                                                        let _ = mailbox.ack(msg_record.id).await;
2203                                                    }
2204                                                }
2205                                            }
2206                                            Err(e) => {
2207                                                // Poison message - cannot decode RpcEnvelope
2208                                                tracing::error!(
2209                                                    severity = 9,
2210                                                    error_category = "protobuf_decode",
2211                                                    message_id = %msg_record.id,
2212                                                    "❌ Poison message: Failed to deserialize RpcEnvelope: {:?}",
2213                                                    e
2214                                                );
2215
2216                                                // Write to Dead Letter Queue
2217                                                use actr_runtime_mailbox::DlqRecord;
2218                                                use chrono::Utc;
2219                                                use uuid::Uuid;
2220
2221                                                let dlq_record = DlqRecord {
2222                                                    id: Uuid::new_v4(),
2223                                                    original_message_id: Some(msg_record.id.to_string()),
2224                                                    from: Some(msg_record.from.clone()),
2225                                                    to: node.actor_id.as_ref().map(|id| {
2226                                                        let mut buf = Vec::new();
2227                                                        id.encode(&mut buf).unwrap();
2228                                                        buf
2229                                                    }),
2230                                                    raw_bytes: msg_record.payload.clone(),
2231                                                    error_message: format!("Protobuf decode failed: {e}"),
2232                                                    error_category: "protobuf_decode".to_string(),
2233                                                    trace_id: format!("mailbox-{}", msg_record.id),
2234                                                    request_id: None,
2235                                                    created_at: Utc::now(),
2236                                                    redrive_attempts: 0,
2237                                                    last_redrive_at: None,
2238                                                    context: Some(format!(
2239                                                        r#"{{"source":"mailbox","priority":"{}"}}"#,
2240                                                        match msg_record.priority {
2241                                                            actr_runtime_mailbox::MessagePriority::High => "high",
2242                                                            actr_runtime_mailbox::MessagePriority::Normal => "normal",
2243                                                        }
2244                                                    )),
2245                                                };
2246
2247                                                if let Err(dlq_err) = node.dlq.enqueue(dlq_record).await {
2248                                                    tracing::error!(
2249                                                        severity = 10,
2250                                                        "❌ CRITICAL: Failed to write poison message to DLQ: {:?}",
2251                                                        dlq_err
2252                                                    );
2253                                                } else {
2254                                                    tracing::warn!(
2255                                                        severity = 9,
2256                                                        "☠️ Poison message moved to DLQ: message_id={}",
2257                                                        msg_record.id
2258                                                    );
2259                                                }
2260
2261                                                // ACK the poison message to remove from mailbox
2262                                                let _ = mailbox.ack(msg_record.id).await;
2263                                            }
2264                                        }
2265                                    }
2266                                }
2267                                Err(e) => {
2268                                    tracing::error!(
2269                                        severity = 9,
2270                                        error_category = "mailbox_error",
2271                                        "❌ Mailbox dequeue failed: {:?}", e
2272                                    );
2273                                    tokio::time::sleep(Duration::from_secs(1)).await;
2274                                }
2275                            }
2276                        }
2277                    }
2278                }
2279                tracing::info!("✅ Mailbox processing loop terminated gracefully");
2280            });
2281
2282            task_handles.push(mailbox_handle);
2283        }
2284        tracing::info!("✅ Mailbox processing loop started");
2285        tracing::info!("✅ ActrNode started successfully");
2286
2287        {
2288            let ready_ctx = bootstrap_ctx_builder
2289                .build_bootstrap(&actor_id, &credential_state.credential().await);
2290            let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_ready");
2291            let call_executor =
2292                lifecycle_host_abi(ready_ctx.clone(), node_ref.workload_dispatch.clone());
2293            let mut workload = node_ref.workload_dispatch.lock().await;
2294            if let Err(e) = crate::lifecycle::hooks::call_lifecycle_hook(
2295                "on_ready",
2296                workload.on_ready(ready_ctx, invocation, &call_executor),
2297            )
2298            .await
2299            {
2300                tracing::warn!(error = %e, "workload on_ready returned Err");
2301            }
2302        }
2303
2304        // Create ActrRefShared
2305        let shared = Arc::new(ActrRefShared {
2306            actor_id,
2307            bootstrap_ctx_builder,
2308            credential_state,
2309            shutdown_token,
2310            task_handles: Mutex::new(task_handles),
2311        });
2312
2313        // Create ActrRef
2314        tracing::info!("✅ ActrRef created (Shell → Guest communication handle)");
2315
2316        Ok(ActrRef { shared })
2317    }
2318}