actr_hyper/lifecycle/node.rs
1//! Node runtime inner — holds all running-state fields for an attached node.
2//!
3//! This module is the internal implementation backing the public
4//! `Node<Attached>` / `Node<Registered>` typestate chain defined in
5//! `crate::lib`. The struct itself is crate-private; consumers interact with
6//! it indirectly through `Node<S>` → `ActrRef` transitions.
7
8use crate::actr_ref::{ActrRef, ActrRefShared};
9use crate::ais_client::AisClient;
10use crate::context::{BootstrapContextBuilder, RuntimeContext};
11use crate::inbound::{DataStreamRegistry, MediaFrameRegistry};
12use crate::lifecycle::dedup::{DEDUP_TTL, DedupOutcome, DedupState, DedupWaiter};
13use crate::outbound::Gate;
14use crate::transport::HostTransport;
15use crate::wire::webrtc::SignalingClient;
16#[cfg(feature = "opentelemetry")]
17use crate::wire::webrtc::trace::{inject_span_context_to_rpc, set_parent_from_rpc_envelope};
18use actr_framework::Bytes;
19use actr_protocol::prost::Message as ProstMessage;
20use actr_protocol::{
21 AIdCredential, ActorResult, ActrError, ActrId, PayloadType, RegisterAuthMode, RegisterRequest,
22 RpcEnvelope, TurnCredential, register_response,
23};
24use actr_runtime::check_acl_permission;
25use actr_runtime_mailbox::{DeadLetterQueue, Mailbox};
26use std::sync::Arc;
27use std::time::Duration;
28use tokio::sync::{Mutex, RwLock};
29use tokio_util::sync::CancellationToken;
30#[cfg(feature = "opentelemetry")]
31use tracing::Instrument as _;
32
33/// Internal running-state of an attached node.
34///
35/// Holds every field required to run a workload after `Hyper::attach` has
36/// bound a package. Kept private to the crate: external callers use the
37/// public `Node<S>` wrappers in `crate::lib` and the `ActrRef` handle
38/// returned by `Node::start`.
39pub(crate) struct Inner {
40 /// Runtime configuration
41 pub(crate) config: actr_config::RuntimeConfig,
42
43 /// SQLite persistent mailbox
44 pub(crate) mailbox: Arc<dyn Mailbox>,
45
46 /// Dead Letter Queue for poison messages
47 pub(crate) dlq: Arc<dyn DeadLetterQueue>,
48
49 /// In-process gate for `Dest::Shell` / `Dest::Local` calls.
50 ///
51 /// Created in `build()` together with `shell_to_workload` so the inproc
52 /// lane is usable as soon as the node exists, even before registration.
53 pub(crate) inproc_gate: Gate,
54
55 /// Cross-process gate for `Dest::Actor(_)` calls.
56 ///
57 /// `None` until `start()` finishes WebRTC / PeerGate initialization. Any
58 /// outbound call issued before that point returns `Internal("PeerGate
59 /// not initialized yet")` — see `RuntimeContext::select_gate`.
60 pub(crate) outproc_gate: Option<Gate>,
61
62 /// DataStream callback registry shared between the inbound WebRTC / WS
63 /// gates (which dispatch into it) and `RuntimeContext`
64 /// (register_stream / send_data_stream).
65 pub(crate) data_stream_registry: Arc<DataStreamRegistry>,
66
67 /// MediaTrack callback registry shared between WebRTC media tracks and
68 /// `RuntimeContext` (register_media_track / send_media_sample).
69 pub(crate) media_frame_registry: Arc<MediaFrameRegistry>,
70
71 /// Signaling client
72 pub(crate) signaling_client: Arc<dyn SignalingClient>,
73
74 /// Actor ID (obtained after startup)
75 pub(crate) actor_id: Option<ActrId>,
76
77 /// Actor Credential (obtained after startup, used for subsequent authentication messages)
78 pub(crate) credential_state: Option<CredentialState>,
79
80 /// WebRTC coordinator (created after startup)
81 pub(crate) webrtc_coordinator: Option<Arc<crate::wire::webrtc::WebRtcCoordinator>>,
82
83 /// Peer transport manager (created after startup)
84 pub(crate) peer_transport: Option<Arc<crate::transport::PeerTransport>>,
85
86 /// WebRTC Gate (created after startup)
87 pub(crate) webrtc_gate: Option<Arc<crate::wire::webrtc::gate::WebRtcGate>>,
88
89 /// WebSocket Gate (direct-connect mode inbound, optional)
90 pub(crate) websocket_gate: Option<Arc<crate::wire::websocket::WebSocketGate>>,
91
92 /// Shell → Workload transport (REQUEST direction)
93 ///
94 /// Workload receives REQUEST from Shell (zero serialization, direct RpcEnvelope passing)
95 pub(crate) shell_to_workload: Option<Arc<HostTransport>>,
96
97 /// Workload → Shell transport (RESPONSE direction)
98 ///
99 /// Workload sends RESPONSE to Shell (separate pending_requests from Shell's)
100 pub(crate) workload_to_shell: Option<Arc<HostTransport>>,
101
102 /// Shutdown token for graceful shutdown
103 pub(crate) shutdown_token: CancellationToken,
104
105 /// Packaged manifest.lock.toml content loaded at startup for fingerprint lookups.
106 ///
107 /// Wrapped in `Arc` so per-request `RuntimeContext` clones only bump a refcount
108 /// instead of deep-cloning the dependency vector.
109 pub(crate) actr_lock: Option<Arc<actr_config::lock::LockFile>>,
110 /// Network event receiver (from NetworkEventHandle)
111 pub(crate) network_event_rx:
112 Option<tokio::sync::mpsc::Receiver<crate::lifecycle::network_event::NetworkEventRequest>>,
113
114 /// Network event debounce configuration
115 pub(crate) network_event_debounce_config:
116 Option<crate::lifecycle::network_event::DebounceConfig>,
117
118 /// Request deduplication state (15 s TTL response cache, prevents double-processing on retry)
119 pub(crate) dedup_state: Arc<Mutex<DedupState>>,
120
121 /// Verified package manifest for package-backed nodes.
122 #[allow(dead_code)]
123 pub(crate) package_manifest: Option<actr_pack::PackageManifest>,
124
125 /// Pre-issued registration credential injected by the Hyper layer during
126 /// the `Attached → Registered` state transition. `start()` uses it directly
127 /// instead of re-registering with the signaling server.
128 pub(crate) preregistered_credential: Option<actr_protocol::register_response::RegisterOk>,
129
130 /// Shared WebSocket direct-connect address map populated by discovery
131 ///
132 /// Shared with `DefaultWireBuilder` so discovered ws:// URLs can be reused
133 /// directly instead of relying on a static url_template
134 /// The map is keyed by `ActrId`.
135 pub(crate) discovered_ws_addresses:
136 Arc<tokio::sync::RwLock<std::collections::HashMap<ActrId, String>>>,
137
138 /// Runtime workload (WASM, dynclib, etc.)
139 ///
140 /// `handle_incoming` dispatches through this workload.
141 ///
142 /// The `Mutex` serializes dispatch into a single guest actor instance:
143 /// `WasmWorkload::handle` and `DynClibWorkload::handle` both take
144 /// `&mut self` because the underlying Wasmtime `Store` / native guest
145 /// ABI is single-threaded, so concurrent dispatch through the same
146 /// instance would be unsound. Lifecycle hooks also take this lock because
147 /// package-backed WASM / dynclib workloads expose them on the same guest
148 /// instance; transport and other observation hooks reach linked workloads
149 /// through `hook_observer` without holding this lock.
150 pub(crate) workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
151
152 /// Optional shell-side observer that receives linked-workload transport /
153 /// credential / mailbox hook invocations.
154 ///
155 /// `None` means "no observer installed"; the built-in tracing defaults
156 /// still fire from the event-source wiring sites. When `Some`, hook
157 /// invocations are dispatched through `lifecycle::hooks::spawn_hook`
158 /// so panics in observer code cannot unwind into the event source.
159 #[allow(dead_code)]
160 pub(crate) hook_observer: Option<crate::lifecycle::hooks::WorkloadHookObserverRef>,
161
162 /// Queue-length threshold at which the mailbox backpressure
163 /// watchdog fires the framework `on_mailbox_backpressure` hook.
164 ///
165 /// Resolved from [`HyperConfig`] at node construction time so the
166 /// runtime loop does not need to hold a reference back to `HyperConfig`.
167 pub(crate) mailbox_backpressure_threshold: usize,
168
169 /// Lead time before credential expiry at which the framework fires
170 /// the `on_credential_expiring` hook. Resolved from [`HyperConfig`]
171 /// at node construction time.
172 #[allow(dead_code)]
173 pub(crate) credential_expiry_warning: Duration,
174}
175
176/// Credential state for shared access between tasks
177#[derive(Clone)]
178pub struct CredentialState {
179 inner: Arc<RwLock<CredentialStateInner>>,
180}
181
182#[derive(Clone)]
183struct CredentialStateInner {
184 credential: AIdCredential,
185 expires_at: Option<prost_types::Timestamp>,
186 /// HMAC time-limited TURN credential, updated together with credential on registration/renewal
187 turn_credential: Option<TurnCredential>,
188}
189
190impl CredentialState {
191 /// Create a new CredentialState with TURN credential
192 pub fn new(
193 credential: AIdCredential,
194 expires_at: Option<prost_types::Timestamp>,
195 turn_credential: Option<TurnCredential>,
196 ) -> Self {
197 Self {
198 inner: Arc::new(RwLock::new(CredentialStateInner {
199 credential,
200 expires_at,
201 turn_credential,
202 })),
203 }
204 }
205
206 pub async fn credential(&self) -> AIdCredential {
207 self.inner.read().await.credential.clone()
208 }
209
210 pub async fn expires_at(&self) -> Option<prost_types::Timestamp> {
211 self.inner.read().await.expires_at
212 }
213
214 /// Get TURN credential (HMAC time-limited credential)
215 pub async fn turn_credential(&self) -> Option<TurnCredential> {
216 self.inner.read().await.turn_credential.clone()
217 }
218
219 /// Update credential and TURN credential
220 ///
221 /// Called on credential renewal; only overwrites the old TURN credential when the new one is not empty
222 pub(crate) async fn update(
223 &self,
224 credential: AIdCredential,
225 expires_at: Option<prost_types::Timestamp>,
226 turn_credential: Option<TurnCredential>,
227 ) {
228 let mut guard = self.inner.write().await;
229 guard.credential = credential;
230 guard.expires_at = expires_at;
231 if turn_credential.is_some() {
232 guard.turn_credential = turn_credential;
233 }
234 }
235}
236
237/// Host operation executor - routes guest outbound calls through RuntimeContext
238///
239/// Called by the workload dispatch path in `handle_incoming`.
240async fn host_operation_handler(
241 ctx: crate::context::RuntimeContext,
242 workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
243 pending: crate::workload::HostOperation,
244) -> crate::workload::HostOperationResult {
245 use crate::workload::{HostOperation, HostOperationResult, decode_dest};
246 use actr_framework::guest::dynclib_abi::code as abi_code;
247 use actr_framework::{Context as _, Dest};
248 use actr_protocol::{DataStream, PayloadType};
249
250 /// Map `ActrError` to ABI error code, preserving semantics for guest-side discrimination
251 fn actr_error_to_code(err: &ActrError) -> i32 {
252 match err {
253 ActrError::DecodeFailure(_) | ActrError::InvalidArgument(_) => abi_code::PROTOCOL_ERROR,
254 _ => abi_code::GENERIC_ERROR,
255 }
256 }
257
258 match pending {
259 HostOperation::CallRaw(req) => {
260 match ctx
261 .call_raw(
262 &Dest::Actor(req.target),
263 req.route_key,
264 PayloadType::RpcReliable,
265 bytes::Bytes::from(req.payload),
266 30_000,
267 )
268 .await
269 {
270 Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
271 Err(e) => {
272 tracing::error!("call_raw routing failed: {e:?}");
273 HostOperationResult::Error(actr_error_to_code(&e))
274 }
275 }
276 }
277
278 HostOperation::Call(req) => {
279 let dest = match decode_dest(&req.dest) {
280 Some(d) => d,
281 None => {
282 tracing::error!(route_key = req.route_key, "call: dest decode failed");
283 return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
284 }
285 };
286 match ctx
287 .call_raw(
288 &dest,
289 req.route_key,
290 PayloadType::RpcReliable,
291 bytes::Bytes::from(req.payload),
292 30_000,
293 )
294 .await
295 {
296 Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
297 Err(e) => {
298 tracing::error!("call routing failed: {e:?}");
299 HostOperationResult::Error(actr_error_to_code(&e))
300 }
301 }
302 }
303
304 HostOperation::Tell(req) => {
305 let dest = match decode_dest(&req.dest) {
306 Some(d) => d,
307 None => {
308 tracing::error!(route_key = req.route_key, "tell: dest decode failed");
309 return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
310 }
311 };
312 match ctx
313 .tell_raw(
314 &dest,
315 req.route_key,
316 PayloadType::RpcReliable,
317 bytes::Bytes::from(req.payload),
318 )
319 .await
320 {
321 Ok(()) => HostOperationResult::Done,
322 Err(e) => {
323 tracing::error!("tell routing failed: {e:?}");
324 HostOperationResult::Error(actr_error_to_code(&e))
325 }
326 }
327 }
328
329 HostOperation::Discover(req) => {
330 match ctx.discover_route_candidate(&req.target_type).await {
331 Ok(id) => HostOperationResult::Bytes(id.encode_to_vec()),
332 Err(e) => {
333 tracing::error!("discover failed: {e:?}");
334 HostOperationResult::Error(actr_error_to_code(&e))
335 }
336 }
337 }
338
339 HostOperation::RegisterStream(req) => {
340 let stream_id = req.stream_id;
341 let callback_ctx = ctx.clone();
342 let callback_workload_dispatch = workload_dispatch.clone();
343 match ctx
344 .register_stream(stream_id, move |chunk: DataStream, sender| {
345 let ctx_for_executor = callback_ctx.clone();
346 let workload_dispatch = callback_workload_dispatch.clone();
347 Box::pin(async move {
348 let invocation = crate::workload::InvocationContext {
349 self_id: actr_framework::Context::self_id(&ctx_for_executor).clone(),
350 caller_id: Some(sender.clone()),
351 request_id: format!(
352 "data-stream:{}:{}",
353 chunk.stream_id, chunk.sequence
354 ),
355 };
356 let call_executor: crate::workload::HostAbiFn =
357 std::sync::Arc::new(move |pending| {
358 let ctx = ctx_for_executor.clone();
359 Box::pin(async move {
360 stream_callback_host_operation_handler(ctx, pending).await
361 })
362 });
363 let mut guard = workload_dispatch.lock().await;
364 guard
365 .dispatch_data_stream(chunk, sender, invocation, &call_executor)
366 .await
367 })
368 })
369 .await
370 {
371 Ok(()) => HostOperationResult::Done,
372 Err(e) => {
373 tracing::error!("register_stream failed: {e:?}");
374 HostOperationResult::Error(actr_error_to_code(&e))
375 }
376 }
377 }
378
379 HostOperation::UnregisterStream(req) => match ctx.unregister_stream(&req.stream_id).await {
380 Ok(()) => HostOperationResult::Done,
381 Err(e) => {
382 tracing::error!("unregister_stream failed: {e:?}");
383 HostOperationResult::Error(actr_error_to_code(&e))
384 }
385 },
386
387 HostOperation::SendDataStream(req) => {
388 let dest = match decode_dest(&req.dest) {
389 Some(d) => d,
390 None => {
391 tracing::error!("send_data_stream: dest decode failed");
392 return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
393 }
394 };
395 let payload_type = match PayloadType::try_from(req.payload_type) {
396 Ok(PayloadType::StreamReliable | PayloadType::StreamLatencyFirst) => {
397 PayloadType::try_from(req.payload_type).expect("checked payload type")
398 }
399 Ok(other) => {
400 tracing::error!(?other, "send_data_stream: invalid stream payload type");
401 return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
402 }
403 Err(_) => {
404 tracing::error!(
405 payload_type = req.payload_type,
406 "send_data_stream: unknown payload type"
407 );
408 return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
409 }
410 };
411 match ctx.send_data_stream(&dest, req.chunk, payload_type).await {
412 Ok(()) => HostOperationResult::Done,
413 Err(e) => {
414 tracing::error!("send_data_stream failed: {e:?}");
415 HostOperationResult::Error(actr_error_to_code(&e))
416 }
417 }
418 }
419 }
420}
421
422fn lifecycle_invocation(
423 actor_id: &ActrId,
424 request_id: &'static str,
425) -> crate::workload::InvocationContext {
426 crate::workload::InvocationContext {
427 self_id: actor_id.clone(),
428 caller_id: None,
429 request_id: request_id.to_string(),
430 }
431}
432
433pub(crate) fn lifecycle_host_abi(
434 ctx: crate::context::RuntimeContext,
435 workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
436) -> crate::workload::HostAbiFn {
437 std::sync::Arc::new(move |pending| {
438 let ctx = ctx.clone();
439 let workload_dispatch = workload_dispatch.clone();
440 Box::pin(async move { host_operation_handler(ctx, workload_dispatch, pending).await })
441 })
442}
443
444async fn stream_callback_host_operation_handler(
445 ctx: crate::context::RuntimeContext,
446 pending: crate::workload::HostOperation,
447) -> crate::workload::HostOperationResult {
448 use crate::workload::{HostOperation, HostOperationResult, decode_dest};
449 use actr_framework::guest::dynclib_abi::code as abi_code;
450 use actr_framework::{Context as _, Dest};
451 use actr_protocol::PayloadType;
452
453 fn actr_error_to_code(err: &ActrError) -> i32 {
454 match err {
455 ActrError::DecodeFailure(_) | ActrError::InvalidArgument(_) => abi_code::PROTOCOL_ERROR,
456 _ => abi_code::GENERIC_ERROR,
457 }
458 }
459
460 match pending {
461 HostOperation::CallRaw(req) => {
462 match ctx
463 .call_raw(
464 &Dest::Actor(req.target),
465 req.route_key,
466 PayloadType::RpcReliable,
467 bytes::Bytes::from(req.payload),
468 30_000,
469 )
470 .await
471 {
472 Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
473 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
474 }
475 }
476 HostOperation::Call(req) => {
477 let dest = match decode_dest(&req.dest) {
478 Some(d) => d,
479 None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
480 };
481 match ctx
482 .call_raw(
483 &dest,
484 req.route_key,
485 PayloadType::RpcReliable,
486 bytes::Bytes::from(req.payload),
487 30_000,
488 )
489 .await
490 {
491 Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
492 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
493 }
494 }
495 HostOperation::Tell(req) => {
496 let dest = match decode_dest(&req.dest) {
497 Some(d) => d,
498 None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
499 };
500 match ctx
501 .tell_raw(
502 &dest,
503 req.route_key,
504 PayloadType::RpcReliable,
505 bytes::Bytes::from(req.payload),
506 )
507 .await
508 {
509 Ok(()) => HostOperationResult::Done,
510 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
511 }
512 }
513 HostOperation::Discover(req) => {
514 match ctx.discover_route_candidate(&req.target_type).await {
515 Ok(id) => HostOperationResult::Bytes(id.encode_to_vec()),
516 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
517 }
518 }
519 HostOperation::RegisterStream(_) => {
520 tracing::error!("register_stream from inside a stream callback is not supported");
521 HostOperationResult::Error(abi_code::UNSUPPORTED_OP)
522 }
523 HostOperation::UnregisterStream(req) => match ctx.unregister_stream(&req.stream_id).await {
524 Ok(()) => HostOperationResult::Done,
525 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
526 },
527 HostOperation::SendDataStream(req) => {
528 let dest = match decode_dest(&req.dest) {
529 Some(d) => d,
530 None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
531 };
532 let payload_type = match PayloadType::try_from(req.payload_type) {
533 Ok(PayloadType::StreamReliable | PayloadType::StreamLatencyFirst) => {
534 PayloadType::try_from(req.payload_type).expect("checked payload type")
535 }
536 Ok(_) | Err(_) => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
537 };
538 match ctx.send_data_stream(&dest, req.chunk, payload_type).await {
539 Ok(()) => HostOperationResult::Done,
540 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
541 }
542 }
543 }
544}
545
546/// Map `ActrError` to a stable, unique numeric code for the wire
547/// `ErrorResponse`. Each variant has a distinct code so
548/// `wire_code_to_actr_error` can reconstruct the exact variant on the
549/// receiving side.
550///
551/// Code allocation (100xx namespace to avoid collisions with HTTP):
552///
553/// | code | variant |
554/// |-------|--------------------|
555/// | 10001 | Unavailable |
556/// | 10002 | TimedOut |
557/// | 10003 | NotFound |
558/// | 10004 | PermissionDenied |
559/// | 10005 | InvalidArgument |
560/// | 10006 | UnknownRoute |
561/// | 10007 | DependencyNotFound |
562/// | 10008 | DecodeFailure |
563/// | 10009 | NotImplemented |
564/// | 10010 | Internal |
565pub(crate) fn protocol_error_to_code(err: &ActrError) -> u32 {
566 match err {
567 ActrError::Unavailable(_) => 10001,
568 // ConnectionNotReady is a local send-preflight error and should not
569 // normally cross the wire; if it does, collapse it to Unavailable
570 // (same as the legacy 503 mapping did).
571 ActrError::ConnectionNotReady(_) => 10001,
572 ActrError::TimedOut => 10002,
573 ActrError::NotFound(_) => 10003,
574 ActrError::PermissionDenied(_) => 10004,
575 ActrError::InvalidArgument(_) => 10005,
576 ActrError::UnknownRoute(_) => 10006,
577 ActrError::DependencyNotFound { .. } => 10007,
578 ActrError::DecodeFailure(_) => 10008,
579 ActrError::NotImplemented(_) => 10009,
580 ActrError::Internal(_) => 10010,
581 }
582}
583
584/// Reconstruct an `ActrError` from a wire code + message pair.
585///
586/// This is the inverse of `protocol_error_to_code`. Unknown codes fall
587/// back to `ActrError::Unavailable` to avoid silent data loss.
588pub(crate) fn wire_code_to_actr_error(code: u32, message: String) -> ActrError {
589 match code {
590 10001 => ActrError::Unavailable(message),
591 10002 => ActrError::TimedOut,
592 10003 => ActrError::NotFound(message),
593 10004 => ActrError::PermissionDenied(message),
594 10005 => ActrError::InvalidArgument(message),
595 10006 => ActrError::UnknownRoute(message),
596 10007 => ActrError::DependencyNotFound {
597 service_name: String::new(),
598 message,
599 },
600 10008 => ActrError::DecodeFailure(message),
601 10009 => ActrError::NotImplemented(message),
602 10010 => ActrError::Internal(message),
603 // Legacy HTTP-ish codes emitted before this scheme was introduced,
604 // or any unknown future code: treat as Unavailable.
605 _ => ActrError::Unavailable(format!("rpc error {code}: {message}")),
606 }
607}
608
609impl Inner {
610 #[allow(dead_code)]
611 pub(crate) fn package_manifest(&self) -> Option<&actr_pack::PackageManifest> {
612 self.package_manifest.as_ref()
613 }
614
615 /// Network event processing loop (background task)
616 ///
617 /// # Responsibilities
618 /// - Receive network events from Channel
619 /// - Delegate to NetworkEventProcessor for handling
620 /// - Record processing time and send results
621 async fn network_event_loop(
622 event_rx: tokio::sync::mpsc::Receiver<crate::lifecycle::network_event::NetworkEventRequest>,
623 event_processor: Arc<dyn crate::lifecycle::network_event::NetworkEventProcessor>,
624 shutdown_token: CancellationToken,
625 ) {
626 crate::lifecycle::network_event::run_network_event_reconciler(
627 event_rx,
628 event_processor,
629 shutdown_token,
630 )
631 .await;
632 }
633
634 fn duplicate_wait_timeout(timeout_ms: i64) -> Duration {
635 if timeout_ms > 0 {
636 Duration::from_millis(timeout_ms as u64)
637 } else {
638 DEDUP_TTL
639 }
640 }
641
642 async fn wait_for_inflight_duplicate(
643 mut waiter: DedupWaiter,
644 timeout: Duration,
645 ) -> ActorResult<Bytes> {
646 let wait_for_result = async {
647 loop {
648 if let Some(result) = waiter.borrow().clone() {
649 return result;
650 }
651
652 if waiter.changed().await.is_err() {
653 if let Some(result) = waiter.borrow().clone() {
654 return result;
655 }
656 return Err(ActrError::Unavailable(
657 "duplicate request result unavailable".to_string(),
658 ));
659 }
660 }
661 };
662
663 match tokio::time::timeout(timeout, wait_for_result).await {
664 Ok(result) => result,
665 Err(_) => Err(ActrError::Unavailable(format!(
666 "duplicate request in-flight timed out after {}ms",
667 timeout.as_millis()
668 ))),
669 }
670 }
671
672 /// - Single-hop calls: effectively identical
673 /// - Multi-hop calls: trace_id spans all hops, request_id per hop
674 #[cfg_attr(
675 feature = "opentelemetry",
676 tracing::instrument(
677 skip_all,
678 name = "ActrNode.handle_incoming",
679 fields(
680 actr_id = %self.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default(),
681 route_key = %envelope.route_key,
682 request_id = %envelope.request_id,
683 )
684 )
685 )]
686 pub async fn handle_incoming(
687 &self,
688 envelope: RpcEnvelope,
689 caller_id: Option<&ActrId>,
690 ) -> ActorResult<Bytes> {
691 // Log received message
692 if let Some(caller) = caller_id {
693 tracing::debug!(
694 "📨 Handling incoming message: route_key={}, caller={}, request_id={}",
695 envelope.route_key,
696 caller,
697 envelope.request_id
698 );
699 } else {
700 tracing::debug!(
701 "📨 Handling incoming message: route_key={}, request_id={}",
702 envelope.route_key,
703 envelope.request_id
704 );
705 }
706
707 // 0. Get actor_id early for ACL check
708 let actor_id = self.actor_id.as_ref().ok_or_else(|| {
709 ActrError::Internal(
710 "Actor ID not set - node must be started before handling messages".to_string(),
711 )
712 })?;
713
714 // 0.1. ACL Permission Check (before processing message)
715 let acl_allowed = check_acl_permission(caller_id, actor_id, self.config.acl.as_ref())
716 .map_err(|err_msg| ActrError::Internal(format!("ACL check failed: {}", err_msg)))?;
717
718 if !acl_allowed {
719 tracing::warn!(
720 severity = 5,
721 error_category = "acl_denied",
722 request_id = %envelope.request_id,
723 route_key = %envelope.route_key,
724 caller = %caller_id
725 .map(|c| c.to_string())
726 .unwrap_or_else(|| "<none>".to_string()),
727 "🚫 ACL: Permission denied"
728 );
729
730 return Err(ActrError::PermissionDenied(format!(
731 "ACL denied: {} is not allowed to call {}",
732 caller_id
733 .map(|c| c.to_string())
734 .unwrap_or_else(|| "<unknown>".to_string()),
735 actor_id
736 )));
737 }
738
739 // 0.2. Deduplication: return cached response for retried request_ids
740 let outcome = {
741 self.dedup_state
742 .lock()
743 .await
744 .check_or_mark(&envelope.request_id)
745 };
746 match outcome {
747 DedupOutcome::Fresh => {} // proceed normally
748 DedupOutcome::InFlight(waiter) => {
749 tracing::debug!(
750 request_id = %envelope.request_id,
751 route_key = %envelope.route_key,
752 "duplicate request in-flight; waiting for original result"
753 );
754 return Self::wait_for_inflight_duplicate(
755 waiter,
756 Self::duplicate_wait_timeout(envelope.timeout_ms),
757 )
758 .await;
759 }
760 DedupOutcome::Duplicate(cached) => {
761 tracing::debug!(
762 request_id = %envelope.request_id,
763 route_key = %envelope.route_key,
764 "♻️ returning cached response for duplicate request_id"
765 );
766 return cached;
767 }
768 }
769
770 // 1. Create Context with caller_id from transport layer
771 let credential_state = self.credential_state.clone().ok_or_else(|| {
772 ActrError::Internal(
773 "Credential not set - node must be started before handling messages".to_string(),
774 )
775 })?;
776 let ctx = self.make_runtime_context(
777 actor_id,
778 caller_id, // caller_id from transport layer (MessageRecord.from)
779 &envelope.request_id,
780 &credential_state.credential().await,
781 );
782
783 // 2. Dispatch
784 let dispatch_ctx = crate::workload::InvocationContext {
785 self_id: actor_id.clone(),
786 caller_id: caller_id.cloned(),
787 request_id: envelope.request_id.clone(),
788 };
789 let ctx_for_executor = ctx.clone();
790 let workload_for_executor = self.workload_dispatch.clone();
791 let call_executor: crate::workload::HostAbiFn = std::sync::Arc::new(move |pending| {
792 let ctx = ctx_for_executor.clone();
793 let workload_dispatch = workload_for_executor.clone();
794 Box::pin(async move { host_operation_handler(ctx, workload_dispatch, pending).await })
795 });
796
797 let mut guard = self.workload_dispatch.lock().await;
798 let result = guard
799 .dispatch_envelope(envelope.clone(), ctx.clone(), dispatch_ctx, &call_executor)
800 .await;
801
802 match &result {
803 Ok(_) => tracing::debug!(
804 request_id = %envelope.request_id,
805 route_key = %envelope.route_key,
806 "✅ Message handled successfully"
807 ),
808 Err(e) => tracing::error!(
809 severity = 6,
810 error_category = "handler_error",
811 request_id = %envelope.request_id,
812 route_key = %envelope.route_key,
813 "❌ Message handling failed: {:?}", e
814 ),
815 }
816
817 // 3. Store completed result in dedup cache before returning
818 self.dedup_state
819 .lock()
820 .await
821 .complete(&envelope.request_id, result.clone());
822
823 result
824 }
825
826 /// Build a new `Inner` from config and runtime workload.
827 ///
828 /// This is the internal constructor behind the public node builders and
829 /// Hyper package attach helpers.
830 pub(crate) async fn build(
831 config: actr_config::RuntimeConfig,
832 workload: crate::workload::Workload,
833 package_manifest: Option<actr_pack::PackageManifest>,
834 packaged_lock: Option<actr_config::lock::LockFile>,
835 mailbox_backpressure_threshold: usize,
836 credential_expiry_warning: Duration,
837 ) -> ActorResult<Self> {
838 use crate::outbound::{Gate, HostGate};
839 use crate::wire::webrtc::{ReconnectConfig, SignalingConfig, WebSocketSignalingClient};
840
841 tracing::info!("🚀 Initializing ActrNode");
842
843 // Initialize Mailbox
844 let mailbox_path = config
845 .mailbox_path
846 .as_ref()
847 .map(|p| p.to_string_lossy().to_string())
848 .unwrap_or_else(|| ":memory:".to_string());
849
850 tracing::info!("📂 Mailbox database path: {}", mailbox_path);
851
852 let mailbox: Arc<dyn actr_runtime_mailbox::Mailbox> = Arc::new(
853 actr_runtime_mailbox::SqliteMailbox::new(&mailbox_path)
854 .await
855 .map_err(|e| {
856 actr_protocol::ActrError::Unavailable(format!("Mailbox init failed: {e}"))
857 })?,
858 );
859
860 // Initialize Dead Letter Queue
861 let dlq_path = if mailbox_path == ":memory:" {
862 ":memory:".to_string()
863 } else {
864 format!("{mailbox_path}.dlq")
865 };
866
867 let dlq: Arc<dyn actr_runtime_mailbox::DeadLetterQueue> = Arc::new(
868 actr_runtime_mailbox::SqliteDeadLetterQueue::new_standalone(&dlq_path)
869 .await
870 .map_err(|e| {
871 actr_protocol::ActrError::Unavailable(format!("DLQ init failed: {e}"))
872 })?,
873 );
874 tracing::info!("✅ Dead Letter Queue initialized");
875
876 // Initialize signaling client
877 let webrtc_role = if config.webrtc.advanced.prefer_answerer() {
878 Some("answer".to_string())
879 } else {
880 None
881 };
882
883 let signaling_config = SignalingConfig {
884 server_url: config.signaling_url.clone(),
885 connection_timeout: 30,
886 heartbeat_interval: 30,
887 reconnect_config: ReconnectConfig::default(),
888 auth_config: None,
889 webrtc_role,
890 };
891
892 let client = Arc::new(WebSocketSignalingClient::new(signaling_config));
893 client.start_reconnect_manager();
894 let signaling_client: Arc<dyn crate::wire::webrtc::SignalingClient> = client;
895
896 // Initialize inproc infrastructure (Shell ↔ Guest)
897 let shell_to_workload = Arc::new(HostTransport::new());
898 let workload_to_shell = Arc::new(HostTransport::new());
899 let inproc_gate = Gate::Host(Arc::new(HostGate::new(shell_to_workload.clone())));
900
901 let data_stream_registry = Arc::new(DataStreamRegistry::new());
902 let media_frame_registry = Arc::new(MediaFrameRegistry::new());
903
904 tracing::info!("✅ Inproc infrastructure initialized (bidirectional Shell ↔ Guest)");
905
906 let actr_lock = if let Some(lock) = packaged_lock {
907 tracing::info!(
908 "📋 Loaded packaged manifest.lock.toml with {} dependencies",
909 lock.dependencies.len()
910 );
911 Some(Arc::new(lock))
912 } else {
913 tracing::warn!(
914 "⚠️ manifest.lock.toml not found in package. Continuing without dependency fingerprints."
915 );
916 None
917 };
918
919 tracing::info!("✅ ActrNode initialized");
920
921 Ok(Self {
922 config,
923 mailbox,
924 dlq,
925 inproc_gate,
926 outproc_gate: None, // Populated in start() once WebRTC / PeerGate is ready.
927 data_stream_registry,
928 media_frame_registry,
929 signaling_client,
930 actor_id: None,
931 credential_state: None,
932 webrtc_coordinator: None,
933 peer_transport: None,
934 webrtc_gate: None,
935 websocket_gate: None,
936 shell_to_workload: Some(shell_to_workload),
937 workload_to_shell: Some(workload_to_shell),
938 shutdown_token: CancellationToken::new(),
939 actr_lock,
940 network_event_rx: None,
941 network_event_debounce_config: None,
942 dedup_state: Arc::new(Mutex::new(DedupState::new())),
943 package_manifest,
944 preregistered_credential: None,
945 discovered_ws_addresses: Arc::new(tokio::sync::RwLock::new(
946 std::collections::HashMap::new(),
947 )),
948 workload_dispatch: Arc::new(Mutex::new(workload)),
949 hook_observer: None,
950 mailbox_backpressure_threshold,
951 credential_expiry_warning,
952 })
953 }
954
955 /// Snapshot the current runtime handles into a `BootstrapContextBuilder`.
956 ///
957 /// The returned builder is cloned into long-lived hook closures and into
958 /// `ActrRefShared` so those paths can materialize bootstrap contexts
959 /// without retaining a reference back to `Inner`. The snapshot freezes
960 /// `outproc_gate` and `actr_lock` at call time — callers that want to
961 /// observe a later-initialized `outproc_gate` must rebuild.
962 pub(crate) fn bootstrap_ctx_builder(&self) -> BootstrapContextBuilder {
963 BootstrapContextBuilder::new(
964 self.inproc_gate.clone(),
965 self.outproc_gate.clone(),
966 self.data_stream_registry.clone(),
967 self.media_frame_registry.clone(),
968 self.signaling_client.clone(),
969 self.actr_lock.clone(),
970 self.discovered_ws_addresses.clone(),
971 )
972 }
973
974 /// Build a `RuntimeContext` for the per-request dispatch path.
975 ///
976 /// Unlike `BootstrapContextBuilder::build_bootstrap`, this carries the
977 /// envelope's caller identity and request id through into the context.
978 pub(crate) fn make_runtime_context(
979 &self,
980 self_id: &ActrId,
981 caller_id: Option<&ActrId>,
982 request_id: &str,
983 credential: &AIdCredential,
984 ) -> RuntimeContext {
985 RuntimeContext::new(
986 self_id.clone(),
987 caller_id.cloned(),
988 request_id.to_string(),
989 self.inproc_gate.clone(),
990 self.outproc_gate.clone(),
991 self.data_stream_registry.clone(),
992 self.media_frame_registry.clone(),
993 self.signaling_client.clone(),
994 credential.clone(),
995 self.actr_lock.clone(),
996 self.discovered_ws_addresses.clone(),
997 )
998 }
999
1000 /// Create network event processing infrastructure (called on demand, before `start()`).
1001 ///
1002 /// # Parameters
1003 /// - `debounce_ms`: Debounce window in milliseconds. If 0, no debounce.
1004 ///
1005 /// # Panics
1006 /// Panics if called more than once.
1007 pub fn create_network_event_handle(
1008 &mut self,
1009 debounce_ms: u64,
1010 ) -> crate::lifecycle::NetworkEventHandle {
1011 if self.network_event_rx.is_some() {
1012 panic!("create_network_event_handle() can only be called once");
1013 }
1014
1015 let (event_tx, event_rx) = tokio::sync::mpsc::channel(100);
1016
1017 let debounce_config = if debounce_ms > 0 {
1018 Some(crate::lifecycle::network_event::DebounceConfig {
1019 window: std::time::Duration::from_millis(debounce_ms),
1020 })
1021 } else {
1022 None
1023 };
1024
1025 self.network_event_rx = Some(event_rx);
1026 self.network_event_debounce_config = debounce_config;
1027
1028 tracing::info!(
1029 debounce_ms,
1030 channel_capacity = 100_u64,
1031 "network_event.node.handle_created"
1032 );
1033
1034 crate::lifecycle::NetworkEventHandle::new(event_tx)
1035 }
1036
1037 /// Attach a credential already issued by AIS so that `start()` can skip
1038 /// the signaling registration step.
1039 ///
1040 /// Called by the Hyper layer between `Hyper::register()` and `Hyper::start()`.
1041 pub fn set_preregistered_credential(&mut self, register_ok: register_response::RegisterOk) {
1042 tracing::debug!("Pre-registered credential attached; start() will skip AIS registration");
1043 self.preregistered_credential = Some(register_ok);
1044 }
1045
1046 /// Start the system
1047 pub async fn start(mut self) -> ActorResult<ActrRef> {
1048 tracing::info!("🚀 Starting ActrNode");
1049 tracing::info!("Actr Rust version: {}", env!("CARGO_PKG_VERSION"));
1050
1051 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1052 // 1. Build RegisterRequest
1053 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1054 // Get ActrType from configuration
1055 let actr_type = self.config.actr_type().clone();
1056 tracing::info!("📋 Actor type: {}", actr_type);
1057
1058 // ServiceSpec is derived by the Hyper layer from the verified package
1059 // (see `service_spec::calculate_service_spec_from_package`). The raw
1060 // ActrNode::start() path has no package context and always sends None
1061 // on its own RegisterRequest; callers that need a spec must go
1062 // through `Hyper::register()`.
1063 let service_spec = None;
1064
1065 // If a WebSocket listen port is configured, build the advertised ws:// address
1066 // to register with the signaling server so clients can discover it.
1067 let ws_address = if let Some(port) = self.config.websocket_listen_port {
1068 let host = self
1069 .config
1070 .websocket_advertised_host
1071 .as_deref()
1072 .unwrap_or("127.0.0.1");
1073 Some(format!("ws://{}:{}", host, port))
1074 } else {
1075 None
1076 };
1077
1078 if let Some(ref addr) = ws_address {
1079 tracing::info!(
1080 "📡 Advertising WebSocket address to signaling server: {}",
1081 addr
1082 );
1083 }
1084
1085 let register_request = RegisterRequest {
1086 actr_type: actr_type.clone(),
1087 realm: self.config.realm,
1088 service_spec,
1089 acl: self.config.acl.clone(),
1090 service: None,
1091 ws_address,
1092 auth_mode: Some(RegisterAuthMode::Linked as i32),
1093 ..Default::default()
1094 };
1095
1096 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1097 // 1. Obtain registration info (Hyper pre-injected or AIS HTTP)
1098 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1099 let register_ok = if let Some(injected) = self.preregistered_credential.take() {
1100 tracing::info!(
1101 "Using Hyper pre-injected registration credential; skipping AIS registration"
1102 );
1103 injected
1104 } else {
1105 let ais_endpoint = &self.config.ais_endpoint;
1106 tracing::info!(
1107 ais_endpoint = %ais_endpoint,
1108 "Registering actor with AIS via HTTP"
1109 );
1110 let mut ais = AisClient::new(ais_endpoint);
1111 if let Some(ref secret) = self.config.realm_secret {
1112 ais = ais.with_realm_secret(secret);
1113 }
1114 let resp = ais
1115 .register_linked(register_request.clone())
1116 .await
1117 .map_err(|e| ActrError::Unavailable(format!("AIS registration failed: {e}")))?;
1118 match resp.result {
1119 Some(register_response::Result::Success(ok)) => {
1120 tracing::info!("✅ AIS HTTP registration successful");
1121 ok
1122 }
1123 Some(register_response::Result::Error(error)) => {
1124 tracing::error!(
1125 severity = 10,
1126 error_category = "registration_error",
1127 error_code = error.code,
1128 "❌ AIS registration failed: code={}, message={}",
1129 error.code,
1130 error.message
1131 );
1132 return Err(ActrError::Unavailable(format!(
1133 "AIS registration rejected: {} (code: {})",
1134 error.message, error.code
1135 )));
1136 }
1137 None => {
1138 tracing::error!(
1139 severity = 10,
1140 error_category = "registration_error",
1141 "❌ AIS registration response missing result"
1142 );
1143 return Err(ActrError::Unavailable(
1144 "Invalid AIS registration response: missing result".to_string(),
1145 ));
1146 }
1147 }
1148 };
1149
1150 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1151 // 3. Set credential on signaling client, then connect signaling WS
1152 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1153 // The signaling server requires credential params in the WS URL for
1154 // authentication. We must set actor_id + credential BEFORE connecting
1155 // so that build_url_with_identity() includes them in the query string.
1156 let pre_connect_credential_state = {
1157 let actor_id = register_ok.actr_id.clone();
1158 let credential_state = CredentialState::new(
1159 register_ok.credential.clone(),
1160 register_ok.credential_expires_at,
1161 Some(register_ok.turn_credential.clone()),
1162 );
1163 self.signaling_client.set_actor_id(actor_id).await;
1164 self.signaling_client
1165 .set_credential_state(credential_state.clone())
1166 .await;
1167 credential_state
1168 };
1169
1170 // Install the signaling-side hook callback so that
1171 // SignalingConnectStart / Connected / Disconnected events flow
1172 // through the framework tracing defaults and into a
1173 // user-installed observer. Done BEFORE connect() so the initial
1174 // attempt produces a SignalingConnectStart event.
1175 {
1176 let actor_id = register_ok.actr_id.clone();
1177 let credential_state = pre_connect_credential_state.clone();
1178 // Snapshot at this point — outproc_gate is still None here, so
1179 // signaling-event contexts will carry None for outproc_gate
1180 // (matching the pre-existing behavior prior to B13 refactor).
1181 let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1182 let ctx_builder: crate::lifecycle::hooks::HookContextBuilder = Arc::new(move || {
1183 let snapshot = ctx_builder_snapshot.clone();
1184 let actor_id = actor_id.clone();
1185 let credential_state = credential_state.clone();
1186 Box::pin(async move {
1187 Some(snapshot.build_bootstrap(&actor_id, &credential_state.credential().await))
1188 })
1189 });
1190 let cb = crate::lifecycle::hooks::build_hook_callback(
1191 self.hook_observer.clone(),
1192 ctx_builder,
1193 );
1194 self.signaling_client.set_hook_callback(cb);
1195 }
1196
1197 tracing::info!("📡 Connecting to signaling server (with credential)");
1198 self.signaling_client
1199 .connect()
1200 .await
1201 .map_err(|e| ActrError::Unavailable(format!("Signaling connect failed: {e}")))?;
1202 tracing::info!("✅ Connected to signaling server");
1203
1204 // Collect background task handles so they can be managed by ActrRefShared later.
1205 let mut task_handles = Vec::new();
1206
1207 // Node-level hook callback, built inside the registration
1208 // setup block below and published back out into this wider
1209 // scope so the mailbox backpressure watchdog can subscribe.
1210 let node_hook_callback: Option<crate::wire::webrtc::HookCallback>;
1211
1212 {
1213 let actor_id = register_ok.actr_id;
1214 let credential = register_ok.credential;
1215
1216 tracing::info!("🆔 Assigned ActrId: {}", actor_id);
1217 tracing::info!("🔐 Received credential (key_id: {})", credential.key_id);
1218 tracing::info!(
1219 "💓 Signaling heartbeat interval: {} seconds",
1220 register_ok.signaling_heartbeat_interval_secs
1221 );
1222
1223 // TurnCredential is a required field; should always be present under normal registration.
1224 tracing::debug!("TurnCredential received, TURN authentication ready");
1225
1226 if let Some(expires_at) = ®ister_ok.credential_expires_at {
1227 tracing::debug!("⏰ Credential expires at: {}s", expires_at.seconds);
1228 }
1229
1230 // Store ActrId and credential state
1231 self.actor_id = Some(actor_id.clone());
1232 let credential_state = CredentialState::new(
1233 credential,
1234 register_ok.credential_expires_at,
1235 Some(register_ok.turn_credential.clone()),
1236 );
1237 self.credential_state = Some(credential_state.clone());
1238
1239 // Build the node-level lifecycle hook callback once: it is
1240 // reused for the initial `on_credential_renewed`, handed to
1241 // the heartbeat task for subsequent credential events, and
1242 // handed to the mailbox backpressure watchdog for
1243 // `on_mailbox_backpressure` on rising-edge crossings.
1244 //
1245 // The signaling layer already has its own callback installed
1246 // above — this second callback only carries credential and
1247 // mailbox-backpressure events, so no overlap with the
1248 // signaling-event plumbing.
1249 node_hook_callback =
1250 {
1251 let actor_id_for_hook = actor_id.clone();
1252 let credential_state_for_hook = credential_state.clone();
1253 // Snapshot at this point — outproc_gate is still None
1254 // here; credential / mailbox hook contexts inherit that
1255 // and therefore cannot issue Dest::Actor(_) calls (same
1256 // behavior as before B13 refactor).
1257 let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1258 let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1259 Arc::new(move || {
1260 let snapshot = ctx_builder_snapshot.clone();
1261 let actor_id = actor_id_for_hook.clone();
1262 let credential_state = credential_state_for_hook.clone();
1263 Box::pin(async move {
1264 Some(snapshot.build_bootstrap(
1265 &actor_id,
1266 &credential_state.credential().await,
1267 ))
1268 })
1269 });
1270 Some(crate::lifecycle::hooks::build_hook_callback(
1271 self.hook_observer.clone(),
1272 ctx_builder,
1273 ))
1274 };
1275
1276 // Fire `on_credential_renewed` at initial registration: the
1277 // credential is considered "renewed" from "nothing" to the
1278 // value just issued by AIS. Subsequent renewals fire the
1279 // same hook from `lifecycle::heartbeat`.
1280 if let Some(expires_at) = ®ister_ok.credential_expires_at {
1281 let new_expiry = std::time::UNIX_EPOCH
1282 + std::time::Duration::from_secs(expires_at.seconds.max(0) as u64);
1283 if let Some(cb) = node_hook_callback.as_ref() {
1284 cb(crate::wire::webrtc::HookEvent::CredentialRenewed { new_expiry }).await;
1285 } else {
1286 tracing::info!(new_expiry = ?new_expiry, "credential renewed");
1287 }
1288 }
1289
1290 // Note: actor_id and credential_state were already set on signaling_client
1291 // before connect (step 3 above), so reconnect URLs already carry correct auth.
1292
1293 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1294 // 1.3. Inproc transports were filled in during `build()`; nothing
1295 // to stage here now that ContextFactory has been removed.
1296 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1297 tracing::info!("✅ Inproc infrastructure already ready (created in ActrNode::build())");
1298
1299 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1300 // 1.5. Create WebRTC infrastructure
1301 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1302 tracing::info!("🌐 Initializing WebRTC infrastructure");
1303
1304 let media_frame_registry = self.media_frame_registry.clone();
1305
1306 // Create WebRtcCoordinator
1307 let coordinator = Arc::new(crate::wire::webrtc::WebRtcCoordinator::new(
1308 actor_id.clone(),
1309 credential_state.clone(),
1310 self.signaling_client.clone(),
1311 self.config.webrtc.clone(),
1312 media_frame_registry,
1313 ));
1314
1315 // Install the WebRTC hook callback — fires
1316 // WebRtcConnectStart / Connected (with relayed info) /
1317 // Disconnected HookEvents on every peer state change.
1318 {
1319 let actor_id_for_hook = actor_id.clone();
1320 let credential_state_for_hook = credential_state.clone();
1321 // Snapshot before outproc_gate is wired up (just below). This
1322 // preserves the pre-refactor behavior where WebRTC-event
1323 // hook contexts carry outproc_gate = None.
1324 let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1325 let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1326 Arc::new(move || {
1327 let snapshot = ctx_builder_snapshot.clone();
1328 let actor_id = actor_id_for_hook.clone();
1329 let credential_state = credential_state_for_hook.clone();
1330 Box::pin(async move {
1331 Some(
1332 snapshot.build_bootstrap(
1333 &actor_id,
1334 &credential_state.credential().await,
1335 ),
1336 )
1337 })
1338 });
1339 let cb = crate::lifecycle::hooks::build_hook_callback(
1340 self.hook_observer.clone(),
1341 ctx_builder,
1342 );
1343 coordinator.set_hook_callback(cb);
1344 }
1345
1346 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1347 // 1.6. Create PeerTransport + PeerGate (new architecture)
1348 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1349 tracing::info!("🏗️ Creating PeerTransport with WebRTC support");
1350
1351 // Pre-allocate the pending-requests map so it can be shared between
1352 // DefaultWireBuilder (for outbound WS response reader tasks) and
1353 // PeerGate (for request/response matching).
1354 let pending_requests: crate::outbound::PendingRequestsMap =
1355 Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new()));
1356
1357 // Create DefaultWireBuilder with WebRTC coordinator
1358 use crate::transport::{DefaultWireBuilder, DefaultWireBuilderConfig};
1359
1360 // WebSocket channel always enabled: target ws:// address is fully discovered at runtime
1361 // Direct-connect mode: encode local node ActrId as hex, sent as X-Actr-Node-Id
1362 let local_id_hex = hex::encode(actor_id.encode_to_vec());
1363 let wire_builder_config = DefaultWireBuilderConfig {
1364 local_id_hex,
1365 enable_webrtc: true,
1366 enable_websocket: true,
1367 // Share the discovered_ws_addresses map so that post-discovery calls
1368 // can use the signaling-provided ws:// URL for this actor node.
1369 discovered_ws_addresses: self.discovered_ws_addresses.clone(),
1370 // Pass credential_state so outbound WS handshake carries X-Actr-Credential,
1371 // enabling peer WebSocketGate to perform Ed25519 signature verification.
1372 credential_state: Some(credential_state.clone()),
1373 // Pass pending_requests so outbound WS connections spawn reader tasks
1374 // to deliver server responses back to `send_request_with_type` futures.
1375 pending_requests: Some(pending_requests.clone()),
1376 };
1377 let wire_builder = Arc::new(DefaultWireBuilder::new(
1378 Some(coordinator.clone()),
1379 wire_builder_config,
1380 ));
1381
1382 // Create PeerTransport
1383 use crate::transport::PeerTransport;
1384 let transport_manager = Arc::new(PeerTransport::new(actor_id.clone(), wire_builder));
1385 self.peer_transport = Some(transport_manager.clone());
1386
1387 // Create PeerGate with the pre-allocated pending_requests map and WebRTC coordinator.
1388 use crate::outbound::PeerGate;
1389 let outproc_gate = Arc::new(PeerGate::with_pending_requests(
1390 transport_manager,
1391 Some(coordinator.clone()),
1392 pending_requests.clone(),
1393 ));
1394 let outproc_gate_enum = Gate::Peer(outproc_gate.clone());
1395 tracing::info!("PeerTransport + PeerGate initialized");
1396
1397 let data_stream_registry = self.data_stream_registry.clone();
1398
1399 // Create WebRtcGate with shared pending_requests and DataStreamRegistry
1400 let gate = Arc::new(crate::wire::webrtc::gate::WebRtcGate::new(
1401 coordinator.clone(),
1402 pending_requests,
1403 data_stream_registry.clone(),
1404 ));
1405 // Set local_id
1406 gate.set_local_id(actor_id.clone()).await;
1407 tracing::info!(
1408 "✅ WebRtcGate created with shared pending_requests and DataStreamRegistry"
1409 );
1410
1411 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1412 // 1.7. Wire the outproc gate into Inner so subsequent
1413 // `make_runtime_context` / `bootstrap_ctx_builder` calls
1414 // observe it. All per-request contexts created by
1415 // `handle_incoming` go through this field live.
1416 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1417 tracing::info!("🔧 Wiring outproc_gate into node");
1418 self.outproc_gate = Some(outproc_gate_enum);
1419 tracing::info!("✅ Node runtime gates fully initialized (inproc + outproc)");
1420
1421 // Save references
1422 self.webrtc_coordinator = Some(coordinator.clone());
1423 self.webrtc_gate = Some(gate.clone());
1424 tracing::info!("✅ WebRTC infrastructure initialized");
1425
1426 // Fire `on_start` once the runtime context can see the initialized
1427 // gates, before starting request-accepting/background loops. Its
1428 // Err/panic aborts Node::start.
1429 {
1430 let startup_ctx = self
1431 .bootstrap_ctx_builder()
1432 .build_bootstrap(&actor_id, &credential_state.credential().await);
1433 let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_start");
1434 let call_executor =
1435 lifecycle_host_abi(startup_ctx.clone(), self.workload_dispatch.clone());
1436 let mut workload = self.workload_dispatch.lock().await;
1437 crate::lifecycle::hooks::call_lifecycle_hook(
1438 "on_start",
1439 workload.on_start(startup_ctx, invocation, &call_executor),
1440 )
1441 .await?;
1442 }
1443
1444 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1445 // 1.7.6. WebSocket Server (direct-connect mode, optional)
1446 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1447 if let Some(listen_port) = self.config.websocket_listen_port {
1448 tracing::info!(
1449 "🔌 WebSocket direct-connect mode enabled, binding port {}",
1450 listen_port
1451 );
1452 use crate::key_cache::AisKeyCache;
1453 use crate::wire::websocket::gate::WsAuthContext;
1454 use crate::wire::websocket::{WebSocketGate, WebSocketServer};
1455
1456 // Build AisKeyCache and seed it with the signing key from the registration response
1457 let ais_key_cache = AisKeyCache::new();
1458 if !register_ok.signing_pubkey.is_empty() {
1459 match ais_key_cache
1460 .seed(register_ok.signing_key_id, ®ister_ok.signing_pubkey)
1461 .await
1462 {
1463 Ok(()) => tracing::info!(
1464 key_id = register_ok.signing_key_id,
1465 "🔑 AisKeyCache seeded from RegisterOk"
1466 ),
1467 Err(e) => tracing::warn!(
1468 key_id = register_ok.signing_key_id,
1469 error = ?e,
1470 "AisKeyCache seed failed; WebSocket will reject all inbound connections"
1471 ),
1472 }
1473 } else {
1474 tracing::warn!(
1475 "RegisterOk missing signing_pubkey; WebSocket credential verification will degrade"
1476 );
1477 }
1478
1479 let auth_ctx = WsAuthContext {
1480 ais_key_cache,
1481 actor_id: actor_id.clone(),
1482 credential_state: credential_state.clone(),
1483 signaling_client: self.signaling_client.clone(),
1484 };
1485
1486 match WebSocketServer::bind(listen_port).await {
1487 Ok((ws_server, conn_rx)) => {
1488 ws_server.start(self.shutdown_token.clone());
1489 let ws_gate = Arc::new(WebSocketGate::new(
1490 conn_rx,
1491 outproc_gate.get_pending_requests(),
1492 data_stream_registry.clone(),
1493 Some(auth_ctx),
1494 ));
1495
1496 // Install the WebSocket peer-lifecycle hook.
1497 {
1498 let actor_id_for_hook = actor_id.clone();
1499 let credential_state_for_hook = credential_state.clone();
1500 // Snapshot taken after outproc_gate is live: ws
1501 // peer-lifecycle hook contexts can issue
1502 // Dest::Actor(_) calls.
1503 let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1504 let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1505 Arc::new(move || {
1506 let snapshot = ctx_builder_snapshot.clone();
1507 let actor_id = actor_id_for_hook.clone();
1508 let credential_state = credential_state_for_hook.clone();
1509 Box::pin(async move {
1510 Some(snapshot.build_bootstrap(
1511 &actor_id,
1512 &credential_state.credential().await,
1513 ))
1514 })
1515 });
1516 let cb = crate::lifecycle::hooks::build_hook_callback(
1517 self.hook_observer.clone(),
1518 ctx_builder,
1519 );
1520 ws_gate.set_hook_callback(cb);
1521 }
1522
1523 self.websocket_gate = Some(ws_gate);
1524 tracing::info!(
1525 "✅ WebSocketServer + WebSocketGate initialized (credential auth enabled)"
1526 );
1527 }
1528 Err(e) => {
1529 tracing::error!(
1530 "❌ Failed to bind WebSocket server on port {}: {:?}",
1531 listen_port,
1532 e
1533 );
1534 }
1535 }
1536 }
1537
1538 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1539 // 1.7.5. Create shared state for credential management
1540 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1541 // Shared credential state initialized above; reused across tasks
1542
1543 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1544 // 1.8. Spawn heartbeat task (periodic Ping to signaling server)
1545 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1546 {
1547 let shutdown = self.shutdown_token.clone();
1548 let client = self.signaling_client.clone();
1549 let actor_id_for_heartbeat = actor_id.clone();
1550 let credential_state_for_heartbeat = credential_state.clone();
1551 let mailbox_for_heartbeat = self.mailbox.clone();
1552 let register_request_for_heartbeat = register_request.clone();
1553 let webrtc_coordinator_for_heartbeat = self.webrtc_coordinator.clone();
1554 let webrtc_gate_for_heartbeat = self.webrtc_gate.clone();
1555
1556 // Use interval from registration response, default to 30s
1557 let heartbeat_interval_secs = register_ok.signaling_heartbeat_interval_secs;
1558 let heartbeat_interval = if heartbeat_interval_secs > 0 {
1559 Duration::from_secs(heartbeat_interval_secs as u64)
1560 } else {
1561 Duration::from_secs(30)
1562 };
1563 let ais_endpoint_for_heartbeat = self.config.ais_endpoint.clone();
1564 let realm_secret_for_heartbeat = self.config.realm_secret.clone();
1565 let heartbeat_handle = tokio::spawn(crate::lifecycle::heartbeat::heartbeat_task(
1566 shutdown,
1567 client,
1568 actor_id_for_heartbeat,
1569 credential_state_for_heartbeat,
1570 mailbox_for_heartbeat,
1571 heartbeat_interval,
1572 register_request_for_heartbeat,
1573 ais_endpoint_for_heartbeat,
1574 realm_secret_for_heartbeat,
1575 node_hook_callback.clone(),
1576 webrtc_coordinator_for_heartbeat,
1577 webrtc_gate_for_heartbeat,
1578 ));
1579 task_handles.push(heartbeat_handle);
1580 }
1581 tracing::info!(
1582 "✅ Heartbeat task started (interval: {}s)",
1583 register_ok.signaling_heartbeat_interval_secs
1584 );
1585
1586 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1587 // 1.8.5. Spawn network event processing loop
1588 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1589 if let Some(event_rx) = self.network_event_rx.take() {
1590 use crate::lifecycle::network_event::DefaultNetworkEventProcessor;
1591
1592 // Create DefaultNetworkEventProcessor
1593 // If debounce config exists, use new_with_debounce
1594 let event_processor =
1595 if let Some(config) = self.network_event_debounce_config.clone() {
1596 Arc::new(
1597 DefaultNetworkEventProcessor::new_with_debounce_and_peer_transport(
1598 self.signaling_client.clone(),
1599 self.webrtc_coordinator.clone(),
1600 config,
1601 self.peer_transport.clone(),
1602 ),
1603 )
1604 } else {
1605 Arc::new(DefaultNetworkEventProcessor::new_with_peer_transport(
1606 self.signaling_client.clone(),
1607 self.webrtc_coordinator.clone(),
1608 self.peer_transport.clone(),
1609 ))
1610 };
1611
1612 let shutdown = self.shutdown_token.clone();
1613 let network_event_handle = tokio::spawn(async move {
1614 Self::network_event_loop(event_rx, event_processor, shutdown).await;
1615 });
1616 task_handles.push(network_event_handle);
1617 tracing::info!("network_event.node.loop_started");
1618 } else {
1619 tracing::debug!("network_event.node.loop_not_started_no_handle");
1620 }
1621
1622 {
1623 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1624 // 1.9. Spawn dedicated Unregister task (best-effort, with timeout)
1625 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1626 //
1627 // This task:
1628 // - Waits for shutdown_token to be cancelled (e.g., wait_for_ctrl_c_and_shutdown)
1629 // - Then sends UnregisterRequest via signaling client with a timeout
1630 //
1631 // NOTE: we push its JoinHandle into task_handles so it can be aborted
1632 // by ActrRefShared::Drop if needed.
1633 let shutdown = self.shutdown_token.clone();
1634 let client = self.signaling_client.clone();
1635 let actor_id_for_unreg = actor_id.clone();
1636 let credential_state_for_unreg = credential_state.clone();
1637 let webrtc_coordinator = self.webrtc_coordinator.clone();
1638
1639 let unregister_handle = tokio::spawn(async move {
1640 // Wait for shutdown signal
1641 shutdown.cancelled().await;
1642 tracing::info!(
1643 "📡 Shutdown signal received, sending UnregisterRequest for Actor {}",
1644 actor_id_for_unreg
1645 );
1646
1647 // 1. Close all WebRTC peer connections first (if any)
1648 if let Some(coord) = webrtc_coordinator {
1649 if let Err(e) = coord.close_all_peers().await {
1650 tracing::warn!(
1651 "⚠️ Failed to close all WebRTC peers before UnregisterRequest: {}",
1652 e
1653 );
1654 } else {
1655 tracing::info!("✅ All WebRTC peers closed before UnregisterRequest");
1656 }
1657 } else {
1658 tracing::debug!(
1659 "WebRTC coordinator not found before UnregisterRequest (no WebRTC?)"
1660 );
1661 }
1662
1663 // 2. Then send UnregisterRequest with a timeout (e.g. 5 seconds)
1664 let result = tokio::time::timeout(
1665 Duration::from_secs(5),
1666 client.send_unregister_request(
1667 actor_id_for_unreg.clone(),
1668 credential_state_for_unreg.credential().await,
1669 Some("Graceful shutdown".to_string()),
1670 ),
1671 )
1672 .await;
1673 tracing::info!("UnregisterRequest result: {:?}", result);
1674 match result {
1675 Ok(Ok(_)) => {
1676 tracing::info!(
1677 "✅ UnregisterRequest sent to signaling server for Actor {}",
1678 actor_id_for_unreg
1679 );
1680 }
1681 Ok(Err(e)) => {
1682 tracing::warn!(
1683 "⚠️ Failed to send UnregisterRequest for Actor {}: {}",
1684 actor_id_for_unreg,
1685 e
1686 );
1687 }
1688 Err(_) => {
1689 tracing::warn!(
1690 "⚠️ UnregisterRequest timeout (5s) for Actor {}",
1691 actor_id_for_unreg
1692 );
1693 }
1694 }
1695 });
1696
1697 task_handles.push(unregister_handle);
1698 }
1699 } // end registration setup block
1700
1701 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1702 // 2. Transport layer initialization (completed via WebRTC infrastructure)
1703 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1704 tracing::info!("✅ Transport layer initialized via WebRTC infrastructure");
1705
1706 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1707 // 3.1 Convert to Arc (before starting background loops)
1708 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1709 // Clone actor_id before moving self into Arc
1710 let actor_id = self
1711 .actor_id
1712 .as_ref()
1713 .ok_or_else(|| ActrError::Internal("Actor ID not set".to_string()))?
1714 .clone();
1715 // Snapshot now that outproc_gate has been wired above; this builder
1716 // is shared between on_start / on_stop hooks and the ActrRefShared
1717 // handle returned to the caller.
1718 let bootstrap_ctx_builder = self.bootstrap_ctx_builder();
1719 let credential_state = self
1720 .credential_state
1721 .clone()
1722 .expect("CredentialState must be initialized in start()");
1723 let shutdown_token = self.shutdown_token.clone();
1724 let node_ref = Arc::new(self);
1725
1726 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1727 // 3.2. Register workload-level stop hook.
1728 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1729 {
1730 let node = node_ref.clone();
1731 let actor_id = actor_id.clone();
1732 let credential_state = credential_state.clone();
1733 let shutdown = shutdown_token.clone();
1734 let on_stop_handle = tokio::spawn(async move {
1735 shutdown.cancelled().await;
1736 let stop_ctx = node
1737 .bootstrap_ctx_builder()
1738 .build_bootstrap(&actor_id, &credential_state.credential().await);
1739 let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_stop");
1740 let call_executor =
1741 lifecycle_host_abi(stop_ctx.clone(), node.workload_dispatch.clone());
1742 let mut workload = node.workload_dispatch.lock().await;
1743 if let Err(e) = crate::lifecycle::hooks::call_lifecycle_hook(
1744 "on_stop",
1745 workload.on_stop(stop_ctx, invocation, &call_executor),
1746 )
1747 .await
1748 {
1749 tracing::warn!(error = %e, "workload on_stop returned Err");
1750 }
1751 });
1752 task_handles.push(on_stop_handle);
1753 }
1754
1755 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1756 // 3.5. Start WebRTC background loops
1757 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1758 tracing::info!("🚀 Starting WebRTC background loops");
1759
1760 // Start WebRtcCoordinator signaling loop
1761 if let Some(coordinator) = &node_ref.webrtc_coordinator {
1762 coordinator.clone().start().await.map_err(|e| {
1763 ActrError::Unavailable(format!("WebRtcCoordinator start failed: {e}"))
1764 })?;
1765 tracing::info!("✅ WebRtcCoordinator signaling loop started");
1766 }
1767
1768 // Start WebRtcGate message receive loop (route to Mailbox)
1769 if let Some(gate) = &node_ref.webrtc_gate {
1770 gate.start_receive_loop(node_ref.mailbox.clone())
1771 .await
1772 .map_err(|e| {
1773 ActrError::Unavailable(format!("WebRtcGate receive loop start failed: {e}"))
1774 })?;
1775 tracing::info!("✅ WebRtcGate → Mailbox routing started");
1776 }
1777
1778 // Start WebSocketGate message receive loop (route to Mailbox, direct-connect mode)
1779 if let Some(ws_gate) = &node_ref.websocket_gate {
1780 ws_gate
1781 .start_receive_loop(node_ref.mailbox.clone())
1782 .await
1783 .map_err(|e| {
1784 ActrError::Unavailable(format!("WebSocketGate receive loop start failed: {e}"))
1785 })?;
1786 tracing::info!("✅ WebSocketGate → Mailbox routing started");
1787 }
1788 tracing::info!("✅ WebRTC background loops started");
1789
1790 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1791 // 4.6. Start Inproc receive loop (Shell → Guest)
1792 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1793 if let Some(shell_to_workload) = &node_ref.shell_to_workload {
1794 tracing::info!("🔄 Starting Inproc receive loop (Shell → Guest)");
1795 // Start Guest receive loop (Shell → Guest REQUEST)
1796 if let Some(workload_to_shell) = &node_ref.workload_to_shell {
1797 let node = node_ref.clone();
1798 let request_rx_lane = shell_to_workload
1799 .get_lane(PayloadType::RpcReliable, None)
1800 .await
1801 .map_err(|e| {
1802 ActrError::Unavailable(format!("Failed to get guest receive lane: {e}"))
1803 })?;
1804 let response_tx = workload_to_shell.clone();
1805 let shutdown = shutdown_token.clone();
1806
1807 let inproc_handle = tokio::spawn(async move {
1808 loop {
1809 tokio::select! {
1810 _ = shutdown.cancelled() => {
1811 tracing::info!("📭 Guest receive loop (Shell → Guest) received shutdown signal");
1812 break;
1813 }
1814 envelope_result = request_rx_lane.recv_envelope() => {
1815 match envelope_result {
1816 Ok(envelope) => {
1817 let request_id = envelope.request_id.clone();
1818 tracing::debug!("📨 Guest received REQUEST from Shell: request_id={}", request_id);
1819 // Extract and set tracing context from envelope
1820 #[cfg(feature = "opentelemetry")]
1821 let span = {
1822 let actr_id_str = node.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default();
1823 let span = tracing::info_span!("ActrNode.lane_receive", actr_id = %actr_id_str, request_id = %request_id);
1824 set_parent_from_rpc_envelope(&span, &envelope);
1825 span
1826 };
1827
1828 // Shell calls have no caller_id (local process communication)
1829 let handle_incoming_fut = node.handle_incoming(envelope.clone(), None);
1830 #[cfg(feature = "opentelemetry")]
1831 let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
1832
1833 match handle_incoming_fut.await {
1834 Ok(response_bytes) => {
1835 // Send RESPONSE back via workload_to_shell
1836 // Keep same route_key (no prefix needed - separate channels!)
1837 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1838 let mut response_envelope = RpcEnvelope {
1839 route_key: envelope.route_key.clone(),
1840 payload: Some(response_bytes),
1841 error: None,
1842 traceparent: None,
1843 tracestate: None,
1844 request_id: request_id.clone(),
1845 metadata: Vec::new(),
1846 timeout_ms: 30000,
1847 };
1848 // Inject tracing context
1849 #[cfg(feature = "opentelemetry")]
1850 inject_span_context_to_rpc(&span, &mut response_envelope);
1851
1852 // Send via Guest → Shell channel
1853 let send_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, response_envelope);
1854 #[cfg(feature = "opentelemetry")]
1855 let send_response_fut = send_response_fut.instrument(span.clone());
1856 if let Err(e) = send_response_fut.await {
1857 tracing::error!(
1858 severity = 7,
1859 error_category = "transport_error",
1860 request_id = %request_id,
1861 "❌ Failed to send RESPONSE to Shell: {:?}",
1862 e
1863 );
1864 }
1865 }
1866 Err(e) => {
1867 tracing::error!(
1868 severity = 6,
1869 error_category = "handler_error",
1870 request_id = %request_id,
1871 route_key = %envelope.route_key,
1872 "❌ Guest message handling failed: {:?}",
1873 e
1874 );
1875
1876 // Send error response (system-level error on envelope)
1877 let error_response = actr_protocol::ErrorResponse {
1878 code: protocol_error_to_code(&e),
1879 message: e.to_string(),
1880 };
1881 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1882 let mut error_envelope = RpcEnvelope {
1883 route_key: envelope.route_key.clone(),
1884 payload: None,
1885 error: Some(error_response),
1886 traceparent: envelope.traceparent.clone(),
1887 tracestate: envelope.tracestate.clone(),
1888 request_id: request_id.clone(),
1889 metadata: Vec::new(),
1890 timeout_ms: 30000,
1891 };
1892 // Inject tracing context
1893 #[cfg(feature = "opentelemetry")]
1894 inject_span_context_to_rpc(&span, &mut error_envelope);
1895
1896 let send_error_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, error_envelope);
1897 #[cfg(feature = "opentelemetry")]
1898 let send_error_response_fut = send_error_response_fut.instrument(span);
1899 if let Err(send_err) = send_error_response_fut.await {
1900 tracing::error!(
1901 severity = 7,
1902 error_category = "transport_error",
1903 request_id = %request_id,
1904 "❌ Failed to send ERROR response to Shell: {:?}",
1905 send_err
1906 );
1907 }
1908 }
1909 }
1910 }
1911 Err(e) => {
1912 tracing::error!(
1913 severity = 8,
1914 error_category = "transport_error",
1915 "❌ Failed to receive from Shell → Guest lane: {:?}",
1916 e
1917 );
1918 break;
1919 }
1920 }
1921 }
1922 }
1923 }
1924 tracing::info!("✅ Guest receive loop (Shell → Guest) terminated gracefully");
1925 });
1926 task_handles.push(inproc_handle);
1927 }
1928 }
1929 tracing::info!("✅ Guest receive loop (Shell → Guest REQUEST) started");
1930
1931 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1932 // 4.7. Start Shell receive loop (Guest → Shell RESPONSE)
1933 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1934 tracing::info!("🔄 Starting Shell receive loop (Guest → Shell RESPONSE)");
1935 if let Some(workload_to_shell) = &node_ref.workload_to_shell {
1936 // Start Shell receive loop (Guest → Shell RESPONSE)
1937 if let Some(shell_to_workload) = &node_ref.shell_to_workload {
1938 let response_rx_lane = workload_to_shell
1939 .get_lane(PayloadType::RpcReliable, None)
1940 .await
1941 .map_err(|e| {
1942 ActrError::Unavailable(format!("Failed to get shell receive lane: {e}"))
1943 })?;
1944 let request_mgr = shell_to_workload.clone();
1945 let shutdown = shutdown_token.clone();
1946
1947 let shell_receive_handle = tokio::spawn(async move {
1948 loop {
1949 tokio::select! {
1950 _ = shutdown.cancelled() => {
1951 tracing::info!("📭 Shell receive loop (Guest → Shell) received shutdown signal");
1952 break;
1953 }
1954 envelope_result = response_rx_lane.recv_envelope() => {
1955 match envelope_result {
1956 Ok(envelope) => {
1957 tracing::debug!(
1958 "📨 Shell received RESPONSE from Guest: request_id={}",
1959 envelope.request_id
1960 );
1961
1962 // Check if response is success or error
1963 match (envelope.payload, envelope.error) {
1964 (Some(payload), None) => {
1965 // Success response
1966 if let Err(e) = request_mgr
1967 .complete_response(&envelope.request_id, payload)
1968 .await
1969 {
1970 tracing::warn!(
1971 severity = 4,
1972 error_category = "orphan_response",
1973 request_id = %envelope.request_id,
1974 "⚠️ No pending request found for response: {:?}",
1975 e
1976 );
1977 }
1978 }
1979 (None, Some(error)) => {
1980 // Error response — reconstruct the precise ActrError variant
1981 // from the wire code so binding-visible classification
1982 // (UnknownRoute / PermissionDenied / TimedOut / …) is preserved
1983 // instead of collapsing every error into Unavailable.
1984 let actr_err = wire_code_to_actr_error(error.code, error.message);
1985 if let Err(e) = request_mgr
1986 .complete_error(&envelope.request_id, actr_err)
1987 .await
1988 {
1989 tracing::warn!(
1990 severity = 4,
1991 error_category = "orphan_response",
1992 request_id = %envelope.request_id,
1993 "⚠️ No pending request found for error response: {:?}",
1994 e
1995 );
1996 }
1997 }
1998 _ => {
1999 tracing::error!(
2000 severity = 7,
2001 error_category = "protocol_error",
2002 request_id = %envelope.request_id,
2003 "❌ Invalid RpcEnvelope: both payload and error are present or both absent"
2004 );
2005 }
2006 }
2007 }
2008 Err(e) => {
2009 tracing::error!(
2010 severity = 8,
2011 error_category = "transport_error",
2012 "❌ Failed to receive from Guest → Shell lane: {:?}",
2013 e
2014 );
2015 break;
2016 }
2017 }
2018 }
2019 }
2020 }
2021 tracing::info!("✅ Shell receive loop (Guest → Shell) terminated gracefully");
2022 });
2023 task_handles.push(shell_receive_handle);
2024 }
2025 }
2026 tracing::info!("✅ Shell receive loop (Guest → Shell RESPONSE) started");
2027
2028 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2029 // 4.9. Mailbox backpressure watchdog
2030 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2031 //
2032 // Emits the framework `on_mailbox_backpressure` hook once per
2033 // rising-edge crossing of the configured threshold.
2034 //
2035 // Preferred path: a push-based notification from the mailbox
2036 // backend via [`Mailbox::set_depth_observer`], which runs
2037 // synchronously on every enqueue and has zero worst-case delay.
2038 //
2039 // Fallback path: mailbox backends without depth support (or
2040 // which can't cheaply compute depth on every enqueue) keep
2041 // using a 1 Hz poll of [`Mailbox::status`].
2042 let backpressure_threshold = node_ref.mailbox_backpressure_threshold;
2043 {
2044 use std::sync::atomic::{AtomicBool, Ordering};
2045 let mailbox = node_ref.mailbox.clone();
2046 let shutdown = shutdown_token.clone();
2047 let hook_cb = node_hook_callback.clone();
2048 let triggered = Arc::new(AtomicBool::new(false));
2049
2050 // Shared rising-edge state + hook-firing closure used by
2051 // both the push and polling code paths.
2052 let fire_if_rising = {
2053 let triggered = triggered.clone();
2054 let hook_cb = hook_cb.clone();
2055 Arc::new(move |queue_len: usize| {
2056 if queue_len >= backpressure_threshold {
2057 if !triggered.swap(true, Ordering::AcqRel) {
2058 if let Some(cb) = hook_cb.as_ref() {
2059 let cb = cb.clone();
2060 tokio::spawn(async move {
2061 cb(crate::wire::webrtc::HookEvent::MailboxBackpressure {
2062 queue_len,
2063 threshold: backpressure_threshold,
2064 })
2065 .await;
2066 });
2067 } else {
2068 tracing::warn!(
2069 queue_len,
2070 threshold = backpressure_threshold,
2071 "mailbox backpressure",
2072 );
2073 }
2074 }
2075 } else if triggered.swap(false, Ordering::AcqRel) {
2076 tracing::info!(
2077 queue_len,
2078 threshold = backpressure_threshold,
2079 "mailbox backpressure cleared",
2080 );
2081 }
2082 })
2083 };
2084
2085 // Try the push path first. The observer installs only if
2086 // the backend supports it; otherwise `installed` is `false`
2087 // and we fall through to polling.
2088 struct EnqueueObserver {
2089 fire: Arc<dyn Fn(usize) + Send + Sync + 'static>,
2090 }
2091 impl actr_runtime_mailbox::MailboxDepthObserver for EnqueueObserver {
2092 fn on_depth_change(&self, queued_messages: usize) {
2093 (self.fire)(queued_messages);
2094 }
2095 }
2096
2097 let installed = {
2098 let observer: Arc<dyn actr_runtime_mailbox::MailboxDepthObserver> =
2099 Arc::new(EnqueueObserver {
2100 fire: fire_if_rising.clone(),
2101 });
2102 mailbox.set_depth_observer(observer)
2103 };
2104
2105 if installed {
2106 tracing::debug!("mailbox backpressure watchdog: push notifications enabled");
2107 } else {
2108 tracing::debug!(
2109 "mailbox backpressure watchdog: backend does not support push, falling back to 1 Hz polling"
2110 );
2111 let mailbox_for_poll = mailbox.clone();
2112 let shutdown_for_poll = shutdown.clone();
2113 let fire_for_poll = fire_if_rising.clone();
2114 let watchdog_handle = tokio::spawn(async move {
2115 let mut ticker = tokio::time::interval(Duration::from_secs(1));
2116 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
2117 loop {
2118 tokio::select! {
2119 _ = shutdown_for_poll.cancelled() => {
2120 tracing::debug!(
2121 "mailbox backpressure watchdog shutting down"
2122 );
2123 break;
2124 }
2125 _ = ticker.tick() => {
2126 let status = match mailbox_for_poll.status().await {
2127 Ok(s) => s,
2128 Err(e) => {
2129 tracing::debug!(?e, "mailbox status poll failed");
2130 continue;
2131 }
2132 };
2133 fire_for_poll(status.queued_messages as usize);
2134 }
2135 }
2136 }
2137 });
2138 task_handles.push(watchdog_handle);
2139 }
2140 }
2141
2142 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2143 // 5. Start Mailbox processing loop (State Path)
2144 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2145 tracing::info!("🔄 Starting Mailbox processing loop (State Path)");
2146 {
2147 let node = node_ref.clone();
2148 let mailbox = node_ref.mailbox.clone();
2149 let webrtc_gate = node_ref.webrtc_gate.clone();
2150 let ws_gate = node_ref.websocket_gate.clone();
2151 let shutdown = shutdown_token.clone();
2152
2153 let mailbox_handle = tokio::spawn(async move {
2154 loop {
2155 tokio::select! {
2156 // Listen for shutdown signal
2157 _ = shutdown.cancelled() => {
2158 tracing::info!("📭 Mailbox loop received shutdown signal");
2159 break;
2160 }
2161 // Dequeue messages (by priority)
2162 result = mailbox.dequeue() => {
2163 match result {
2164 Ok(messages) => {
2165 if messages.is_empty() {
2166 // Queue empty, sleep briefly
2167 tokio::time::sleep(Duration::from_millis(10)).await;
2168 continue;
2169 }
2170 tracing::debug!("📬 Mailbox dequeue: {} messages", messages.len());
2171
2172 // Process messages one by one
2173 for msg_record in messages {
2174 // Deserialize RpcEnvelope (Protobuf)
2175 match RpcEnvelope::decode(&msg_record.payload[..]) {
2176 Ok(envelope) => {
2177 let request_id = envelope.request_id.clone();
2178 let queue_latency_ms = (chrono::Utc::now() - msg_record.created_at).num_milliseconds();
2179 tracing::info!(request_id = %request_id, queue_latency_ms = queue_latency_ms, "rpc.mailbox.dequeued");
2180
2181 tracing::debug!("📦 Processing message: request_id={}", request_id);
2182 #[cfg(feature = "opentelemetry")]
2183 let span = {
2184 let actr_id_str = node.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default();
2185 let span = tracing::info_span!("ActrNode.mailbox_receive", actr_id = %actr_id_str, request_id = %request_id, queue_wait_ms = queue_latency_ms);
2186 set_parent_from_rpc_envelope(&span, &envelope);
2187 span
2188 };
2189
2190 // Decode caller_id from MessageRecord.from (transport layer)
2191 let caller_id_result = ActrId::decode(&msg_record.from[..]);
2192 let caller_id_ref = caller_id_result.as_ref().ok();
2193
2194 if caller_id_ref.is_none() {
2195 tracing::warn!(
2196 request_id = %request_id,
2197 "⚠️ Failed to decode caller_id from MessageRecord.from"
2198 );
2199 }
2200
2201 // Call handle_incoming with caller_id from transport layer
2202 let handle_incoming_fut = node.handle_incoming(envelope.clone(), caller_id_ref);
2203 #[cfg(feature = "opentelemetry")]
2204 let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
2205
2206 /// Send `response_envelope` back to `caller` via the
2207 /// best available transport.
2208 ///
2209 /// Priority: inbound WebSocket connection (if caller
2210 /// dialled us directly) → WebRTC gate. Returns the
2211 /// first transport error encountered, if any.
2212 async fn send_envelope_to_caller(
2213 ws_gate: &Option<Arc<crate::wire::websocket::WebSocketGate>>,
2214 webrtc_gate: &Option<Arc<crate::wire::webrtc::gate::WebRtcGate>>,
2215 caller: &ActrId,
2216 response_envelope: RpcEnvelope,
2217 request_id: &str,
2218 ) {
2219 // 1. Try inbound WebSocket connection first.
2220 if let Some(wsg) = ws_gate {
2221 match wsg.send_response(caller, response_envelope.clone()).await {
2222 Ok(true) => return, // sent successfully
2223 Ok(false) => {
2224 tracing::debug!(
2225 request_id = request_id,
2226 caller = %caller,
2227 "No inbound WS connection for caller; falling back to WebRTC gate"
2228 );
2229 }
2230 Err(e) => {
2231 tracing::warn!(
2232 severity = 5,
2233 error_category = "transport_error",
2234 request_id = request_id,
2235 "WebSocketGate send_response failed, falling back: {:?}", e
2236 );
2237 }
2238 }
2239 }
2240
2241 // 2. Fall back to WebRTC gate.
2242 if let Some(gate) = webrtc_gate {
2243 if let Err(e) = gate.send_response(caller, response_envelope).await {
2244 tracing::error!(
2245 severity = 7,
2246 error_category = "transport_error",
2247 request_id = request_id,
2248 "❌ WebRtcGate send_response failed: {:?}", e
2249 );
2250 }
2251 } else {
2252 tracing::error!(
2253 severity = 7,
2254 error_category = "transport_error",
2255 request_id = request_id,
2256 "❌ No gate available to send response"
2257 );
2258 }
2259 }
2260
2261 match handle_incoming_fut.await {
2262 Ok(response_bytes) => {
2263 match caller_id_result {
2264 Ok(caller) => {
2265 // Construct response RpcEnvelope (reuse request_id!)
2266 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
2267 let mut response_envelope = RpcEnvelope {
2268 request_id: request_id.clone(),
2269 route_key: envelope.route_key.clone(),
2270 payload: Some(response_bytes),
2271 error: None,
2272 traceparent: envelope.traceparent.clone(),
2273 tracestate: envelope.tracestate.clone(),
2274 metadata: Vec::new(),
2275 timeout_ms: 30000,
2276 };
2277 // Inject tracing context
2278 #[cfg(feature = "opentelemetry")]
2279 inject_span_context_to_rpc(&span, &mut response_envelope);
2280
2281 #[cfg(feature = "opentelemetry")]
2282 let send_fut = send_envelope_to_caller(
2283 &ws_gate,
2284 &webrtc_gate,
2285 &caller,
2286 response_envelope,
2287 &request_id,
2288 ).instrument(span);
2289 #[cfg(not(feature = "opentelemetry"))]
2290 let send_fut = send_envelope_to_caller(
2291 &ws_gate,
2292 &webrtc_gate,
2293 &caller,
2294 response_envelope,
2295 &request_id,
2296 );
2297 send_fut.await;
2298 }
2299 Err(e) => {
2300 tracing::error!(
2301 severity = 8,
2302 error_category = "protobuf_decode",
2303 request_id = %envelope.request_id,
2304 "❌ Failed to decode caller_id: {:?}",
2305 e
2306 );
2307 }
2308 }
2309
2310 // ACK message
2311 if let Err(e) = mailbox.ack(msg_record.id).await {
2312 tracing::error!(
2313 severity = 9,
2314 error_category = "mailbox_error",
2315 request_id = %envelope.request_id,
2316 message_id = %msg_record.id,
2317 "❌ Mailbox ACK failed: {:?}",
2318 e
2319 );
2320 }
2321 }
2322 Err(e) => {
2323 tracing::error!(
2324 severity = 6,
2325 error_category = "handler_error",
2326 request_id = %envelope.request_id,
2327 route_key = %envelope.route_key,
2328 "❌ handle_incoming failed: {:?}", e
2329 );
2330
2331 // Send error envelope back to caller so it
2332 // receives a structured error rather than
2333 // waiting until its deadline fires.
2334 if let Ok(caller) = caller_id_result {
2335 let error_response = actr_protocol::ErrorResponse {
2336 code: protocol_error_to_code(&e),
2337 message: e.to_string(),
2338 };
2339 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
2340 let mut error_envelope = RpcEnvelope {
2341 request_id: request_id.clone(),
2342 route_key: envelope.route_key.clone(),
2343 payload: None,
2344 error: Some(error_response),
2345 traceparent: envelope.traceparent.clone(),
2346 tracestate: envelope.tracestate.clone(),
2347 metadata: Vec::new(),
2348 timeout_ms: 30000,
2349 };
2350 #[cfg(feature = "opentelemetry")]
2351 inject_span_context_to_rpc(&span, &mut error_envelope);
2352
2353 send_envelope_to_caller(
2354 &ws_gate,
2355 &webrtc_gate,
2356 &caller,
2357 error_envelope,
2358 &request_id,
2359 ).await;
2360 }
2361
2362 // ACK to avoid infinite retries
2363 let _ = mailbox.ack(msg_record.id).await;
2364 }
2365 }
2366 }
2367 Err(e) => {
2368 // Poison message - cannot decode RpcEnvelope
2369 tracing::error!(
2370 severity = 9,
2371 error_category = "protobuf_decode",
2372 message_id = %msg_record.id,
2373 "❌ Poison message: Failed to deserialize RpcEnvelope: {:?}",
2374 e
2375 );
2376
2377 // Write to Dead Letter Queue
2378 use actr_runtime_mailbox::DlqRecord;
2379 use chrono::Utc;
2380 use uuid::Uuid;
2381
2382 let dlq_record = DlqRecord {
2383 id: Uuid::new_v4(),
2384 original_message_id: Some(msg_record.id.to_string()),
2385 from: Some(msg_record.from.clone()),
2386 to: node.actor_id.as_ref().map(|id| {
2387 let mut buf = Vec::new();
2388 id.encode(&mut buf).unwrap();
2389 buf
2390 }),
2391 raw_bytes: msg_record.payload.clone(),
2392 error_message: format!("Protobuf decode failed: {e}"),
2393 error_category: "protobuf_decode".to_string(),
2394 trace_id: format!("mailbox-{}", msg_record.id),
2395 request_id: None,
2396 created_at: Utc::now(),
2397 redrive_attempts: 0,
2398 last_redrive_at: None,
2399 context: Some(format!(
2400 r#"{{"source":"mailbox","priority":"{}"}}"#,
2401 match msg_record.priority {
2402 actr_runtime_mailbox::MessagePriority::High => "high",
2403 actr_runtime_mailbox::MessagePriority::Normal => "normal",
2404 }
2405 )),
2406 };
2407
2408 if let Err(dlq_err) = node.dlq.enqueue(dlq_record).await {
2409 tracing::error!(
2410 severity = 10,
2411 "❌ CRITICAL: Failed to write poison message to DLQ: {:?}",
2412 dlq_err
2413 );
2414 } else {
2415 tracing::warn!(
2416 severity = 9,
2417 "☠️ Poison message moved to DLQ: message_id={}",
2418 msg_record.id
2419 );
2420 }
2421
2422 // ACK the poison message to remove from mailbox
2423 let _ = mailbox.ack(msg_record.id).await;
2424 }
2425 }
2426 }
2427 }
2428 Err(e) => {
2429 tracing::error!(
2430 severity = 9,
2431 error_category = "mailbox_error",
2432 "❌ Mailbox dequeue failed: {:?}", e
2433 );
2434 tokio::time::sleep(Duration::from_secs(1)).await;
2435 }
2436 }
2437 }
2438 }
2439 }
2440 tracing::info!("✅ Mailbox processing loop terminated gracefully");
2441 });
2442
2443 task_handles.push(mailbox_handle);
2444 }
2445 tracing::info!("✅ Mailbox processing loop started");
2446 tracing::info!("✅ ActrNode started successfully");
2447
2448 {
2449 let ready_ctx = bootstrap_ctx_builder
2450 .build_bootstrap(&actor_id, &credential_state.credential().await);
2451 let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_ready");
2452 let call_executor =
2453 lifecycle_host_abi(ready_ctx.clone(), node_ref.workload_dispatch.clone());
2454 let mut workload = node_ref.workload_dispatch.lock().await;
2455 if let Err(e) = crate::lifecycle::hooks::call_lifecycle_hook(
2456 "on_ready",
2457 workload.on_ready(ready_ctx, invocation, &call_executor),
2458 )
2459 .await
2460 {
2461 tracing::warn!(error = %e, "workload on_ready returned Err");
2462 }
2463 }
2464
2465 // Create ActrRefShared
2466 let shared = Arc::new(ActrRefShared {
2467 actor_id,
2468 bootstrap_ctx_builder,
2469 credential_state,
2470 shutdown_token,
2471 task_handles: Mutex::new(task_handles),
2472 });
2473
2474 // Create ActrRef
2475 tracing::info!("✅ ActrRef created (Shell → Guest communication handle)");
2476
2477 Ok(ActrRef { shared })
2478 }
2479}