actr_hyper/lifecycle/node.rs
1//! Node runtime inner — holds all running-state fields for an attached node.
2//!
3//! This module is the internal implementation backing the public
4//! `Node<Attached>` / `Node<Registered>` typestate chain defined in
5//! `crate::lib`. The struct itself is crate-private; consumers interact with
6//! it indirectly through `Node<S>` → `ActrRef` transitions.
7
8use crate::actr_ref::{ActrRef, ActrRefShared};
9use crate::ais_client::AisClient;
10use crate::context::{BootstrapContextBuilder, RuntimeContext};
11use crate::inbound::{DataStreamRegistry, MediaFrameRegistry};
12use crate::lifecycle::dedup::{DEDUP_TTL, DedupOutcome, DedupState, DedupWaiter};
13use crate::outbound::Gate;
14use crate::transport::HostTransport;
15use crate::wire::webrtc::SignalingClient;
16#[cfg(feature = "opentelemetry")]
17use crate::wire::webrtc::trace::{inject_span_context_to_rpc, set_parent_from_rpc_envelope};
18use actr_framework::Bytes;
19use actr_protocol::prost::Message as ProstMessage;
20use actr_protocol::{
21 AIdCredential, ActorResult, ActrError, ActrId, ConnectionNotReadyInfo, PayloadType,
22 RegisterAuthMode, RegisterRequest, RpcEnvelope, TurnCredential, register_response,
23};
24use actr_runtime::check_acl_permission;
25use actr_runtime_mailbox::{DeadLetterQueue, Mailbox};
26use std::sync::Arc;
27use std::time::Duration;
28use tokio::sync::{Mutex, RwLock};
29use tokio_util::sync::CancellationToken;
30#[cfg(feature = "opentelemetry")]
31use tracing::Instrument as _;
32
33/// Internal running-state of an attached node.
34///
35/// Holds every field required to run a workload after `Hyper::attach` has
36/// bound a package. Kept private to the crate: external callers use the
37/// public `Node<S>` wrappers in `crate::lib` and the `ActrRef` handle
38/// returned by `Node::start`.
39pub(crate) struct Inner {
40 /// Runtime configuration
41 pub(crate) config: actr_config::RuntimeConfig,
42
43 /// SQLite persistent mailbox
44 pub(crate) mailbox: Arc<dyn Mailbox>,
45
46 /// Dead Letter Queue for poison messages
47 pub(crate) dlq: Arc<dyn DeadLetterQueue>,
48
49 /// In-process gate for `Dest::Shell` / `Dest::Local` calls.
50 ///
51 /// Created in `build()` together with `shell_to_workload` so the inproc
52 /// lane is usable as soon as the node exists, even before registration.
53 pub(crate) inproc_gate: Gate,
54
55 /// Cross-process gate for `Dest::Actor(_)` calls.
56 ///
57 /// `None` until `start()` finishes WebRTC / PeerGate initialization. Any
58 /// outbound call issued before that point returns `Internal("PeerGate
59 /// not initialized yet")` — see `RuntimeContext::select_gate`.
60 pub(crate) outproc_gate: Option<Gate>,
61
62 /// DataStream callback registry shared between the inbound WebRTC / WS
63 /// gates (which dispatch into it) and `RuntimeContext`
64 /// (register_stream / send_data_stream).
65 pub(crate) data_stream_registry: Arc<DataStreamRegistry>,
66
67 /// MediaTrack callback registry shared between WebRTC media tracks and
68 /// `RuntimeContext` (register_media_track / send_media_sample).
69 pub(crate) media_frame_registry: Arc<MediaFrameRegistry>,
70
71 /// Signaling client
72 pub(crate) signaling_client: Arc<dyn SignalingClient>,
73
74 /// Actor ID (obtained after startup)
75 pub(crate) actor_id: Option<ActrId>,
76
77 /// Actor Credential (obtained after startup, used for subsequent authentication messages)
78 pub(crate) credential_state: Option<CredentialState>,
79
80 /// WebRTC coordinator (created after startup)
81 pub(crate) webrtc_coordinator: Option<Arc<crate::wire::webrtc::WebRtcCoordinator>>,
82
83 /// Peer transport manager (created after startup)
84 pub(crate) peer_transport: Option<Arc<crate::transport::PeerTransport>>,
85
86 /// WebRTC Gate (created after startup)
87 pub(crate) webrtc_gate: Option<Arc<crate::wire::webrtc::gate::WebRtcGate>>,
88
89 /// WebSocket Gate (direct-connect mode inbound, optional)
90 pub(crate) websocket_gate: Option<Arc<crate::wire::websocket::WebSocketGate>>,
91
92 /// Shell → Workload transport (REQUEST direction)
93 ///
94 /// Workload receives REQUEST from Shell (zero serialization, direct RpcEnvelope passing)
95 pub(crate) shell_to_workload: Option<Arc<HostTransport>>,
96
97 /// Workload → Shell transport (RESPONSE direction)
98 ///
99 /// Workload sends RESPONSE to Shell (separate pending_requests from Shell's)
100 pub(crate) workload_to_shell: Option<Arc<HostTransport>>,
101
102 /// Shutdown token for graceful shutdown
103 pub(crate) shutdown_token: CancellationToken,
104
105 /// Packaged manifest.lock.toml content loaded at startup for fingerprint lookups.
106 ///
107 /// Wrapped in `Arc` so per-request `RuntimeContext` clones only bump a refcount
108 /// instead of deep-cloning the dependency vector.
109 pub(crate) actr_lock: Option<Arc<actr_config::lock::LockFile>>,
110 /// Network event receiver (from NetworkEventHandle)
111 pub(crate) network_event_rx:
112 Option<tokio::sync::mpsc::Receiver<crate::lifecycle::network_event::NetworkEventRequest>>,
113
114 /// Network event debounce configuration
115 pub(crate) network_event_debounce_config:
116 Option<crate::lifecycle::network_event::DebounceConfig>,
117
118 /// Request deduplication state (15 s TTL response cache, prevents double-processing on retry)
119 pub(crate) dedup_state: Arc<Mutex<DedupState>>,
120
121 /// Verified package manifest for package-backed nodes.
122 #[allow(dead_code)]
123 pub(crate) package_manifest: Option<actr_pack::PackageManifest>,
124
125 /// Pre-issued registration credential injected by the Hyper layer during
126 /// the `Attached → Registered` state transition. `start()` uses it directly
127 /// instead of re-registering with the signaling server.
128 pub(crate) preregistered_credential: Option<actr_protocol::register_response::RegisterOk>,
129
130 /// Shared WebSocket direct-connect address map populated by discovery
131 ///
132 /// Shared with `DefaultWireBuilder` so discovered ws:// URLs can be reused
133 /// directly instead of relying on a static url_template
134 /// The map is keyed by `ActrId`.
135 pub(crate) discovered_ws_addresses:
136 Arc<tokio::sync::RwLock<std::collections::HashMap<ActrId, String>>>,
137
138 /// Runtime workload (WASM, dynclib, etc.)
139 ///
140 /// `handle_incoming` dispatches through this workload.
141 ///
142 /// The `Mutex` serializes dispatch into a single guest actor instance:
143 /// `WasmWorkload::handle` and `DynClibWorkload::handle` both take
144 /// `&mut self` because the underlying Wasmtime `Store` / native guest
145 /// ABI is single-threaded, so concurrent dispatch through the same
146 /// instance would be unsound. Lifecycle hooks also take this lock because
147 /// package-backed WASM / dynclib workloads expose them on the same guest
148 /// instance; transport and other observation hooks reach linked workloads
149 /// through `hook_observer` without holding this lock.
150 pub(crate) workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
151
152 /// Optional shell-side observer that receives linked-workload transport /
153 /// credential / mailbox hook invocations.
154 ///
155 /// `None` means "no observer installed"; the built-in tracing defaults
156 /// still fire from the event-source wiring sites. When `Some`, hook
157 /// invocations are dispatched through `lifecycle::hooks::spawn_hook`
158 /// so panics in observer code cannot unwind into the event source.
159 #[allow(dead_code)]
160 pub(crate) hook_observer: Option<crate::lifecycle::hooks::WorkloadHookObserverRef>,
161
162 /// Queue-length threshold at which the mailbox backpressure
163 /// watchdog fires the framework `on_mailbox_backpressure` hook.
164 ///
165 /// Resolved from [`HyperConfig`] at node construction time so the
166 /// runtime loop does not need to hold a reference back to `HyperConfig`.
167 pub(crate) mailbox_backpressure_threshold: usize,
168
169 /// Lead time before credential expiry at which the framework fires
170 /// the `on_credential_expiring` hook. Resolved from [`HyperConfig`]
171 /// at node construction time.
172 #[allow(dead_code)]
173 pub(crate) credential_expiry_warning: Duration,
174}
175
176/// Credential state for shared access between tasks
177#[derive(Clone)]
178pub struct CredentialState {
179 inner: Arc<RwLock<CredentialStateInner>>,
180}
181
182#[derive(Clone)]
183struct CredentialStateInner {
184 credential: AIdCredential,
185 expires_at: Option<prost_types::Timestamp>,
186 /// HMAC time-limited TURN credential, updated together with credential on registration/renewal
187 turn_credential: Option<TurnCredential>,
188}
189
190impl CredentialState {
191 /// Create a new CredentialState with TURN credential
192 pub fn new(
193 credential: AIdCredential,
194 expires_at: Option<prost_types::Timestamp>,
195 turn_credential: Option<TurnCredential>,
196 ) -> Self {
197 Self {
198 inner: Arc::new(RwLock::new(CredentialStateInner {
199 credential,
200 expires_at,
201 turn_credential,
202 })),
203 }
204 }
205
206 pub async fn credential(&self) -> AIdCredential {
207 self.inner.read().await.credential.clone()
208 }
209
210 pub async fn expires_at(&self) -> Option<prost_types::Timestamp> {
211 self.inner.read().await.expires_at
212 }
213
214 /// Get TURN credential (HMAC time-limited credential)
215 pub async fn turn_credential(&self) -> Option<TurnCredential> {
216 self.inner.read().await.turn_credential.clone()
217 }
218
219 /// Update credential and TURN credential
220 ///
221 /// Called on credential renewal; only overwrites the old TURN credential when the new one is not empty
222 pub(crate) async fn update(
223 &self,
224 credential: AIdCredential,
225 expires_at: Option<prost_types::Timestamp>,
226 turn_credential: Option<TurnCredential>,
227 ) {
228 let mut guard = self.inner.write().await;
229 guard.credential = credential;
230 guard.expires_at = expires_at;
231 if turn_credential.is_some() {
232 guard.turn_credential = turn_credential;
233 }
234 }
235}
236
237/// Host operation executor - routes guest outbound calls through RuntimeContext
238///
239/// Called by the workload dispatch path in `handle_incoming`.
240async fn host_operation_handler(
241 ctx: crate::context::RuntimeContext,
242 workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
243 pending: crate::workload::HostOperation,
244) -> crate::workload::HostOperationResult {
245 use crate::workload::{HostOperation, HostOperationResult, decode_dest};
246 use actr_framework::guest::dynclib_abi::code as abi_code;
247 use actr_framework::{Context as _, Dest};
248 use actr_protocol::{DataStream, PayloadType};
249
250 /// Map `ActrError` to ABI error code, preserving semantics for guest-side discrimination
251 fn actr_error_to_code(err: &ActrError) -> i32 {
252 match err {
253 ActrError::DecodeFailure(_) | ActrError::InvalidArgument(_) => abi_code::PROTOCOL_ERROR,
254 _ => abi_code::GENERIC_ERROR,
255 }
256 }
257
258 match pending {
259 HostOperation::CallRaw(req) => {
260 match ctx
261 .call_raw(
262 &Dest::Actor(req.target),
263 req.route_key,
264 PayloadType::RpcReliable,
265 bytes::Bytes::from(req.payload),
266 30_000,
267 )
268 .await
269 {
270 Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
271 Err(e) => {
272 tracing::error!("call_raw routing failed: {e:?}");
273 HostOperationResult::Error(actr_error_to_code(&e))
274 }
275 }
276 }
277
278 HostOperation::Call(req) => {
279 let dest = match decode_dest(&req.dest) {
280 Some(d) => d,
281 None => {
282 tracing::error!(route_key = req.route_key, "call: dest decode failed");
283 return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
284 }
285 };
286 match ctx
287 .call_raw(
288 &dest,
289 req.route_key,
290 PayloadType::RpcReliable,
291 bytes::Bytes::from(req.payload),
292 30_000,
293 )
294 .await
295 {
296 Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
297 Err(e) => {
298 tracing::error!("call routing failed: {e:?}");
299 HostOperationResult::Error(actr_error_to_code(&e))
300 }
301 }
302 }
303
304 HostOperation::Tell(req) => {
305 let dest = match decode_dest(&req.dest) {
306 Some(d) => d,
307 None => {
308 tracing::error!(route_key = req.route_key, "tell: dest decode failed");
309 return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
310 }
311 };
312 match ctx
313 .tell_raw(
314 &dest,
315 req.route_key,
316 PayloadType::RpcReliable,
317 bytes::Bytes::from(req.payload),
318 )
319 .await
320 {
321 Ok(()) => HostOperationResult::Done,
322 Err(e) => {
323 tracing::error!("tell routing failed: {e:?}");
324 HostOperationResult::Error(actr_error_to_code(&e))
325 }
326 }
327 }
328
329 HostOperation::Discover(req) => {
330 match ctx.discover_route_candidate(&req.target_type).await {
331 Ok(id) => HostOperationResult::Bytes(id.encode_to_vec()),
332 Err(e) => {
333 tracing::error!("discover failed: {e:?}");
334 HostOperationResult::Error(actr_error_to_code(&e))
335 }
336 }
337 }
338
339 HostOperation::RegisterStream(req) => {
340 let stream_id = req.stream_id;
341 let callback_ctx = ctx.clone();
342 let callback_workload_dispatch = workload_dispatch.clone();
343 match ctx
344 .register_stream(stream_id, move |chunk: DataStream, sender| {
345 let ctx_for_executor = callback_ctx.clone();
346 let workload_dispatch = callback_workload_dispatch.clone();
347 Box::pin(async move {
348 let invocation = crate::workload::InvocationContext {
349 self_id: actr_framework::Context::self_id(&ctx_for_executor).clone(),
350 caller_id: Some(sender.clone()),
351 request_id: format!(
352 "data-stream:{}:{}",
353 chunk.stream_id, chunk.sequence
354 ),
355 };
356 let call_executor: crate::workload::HostAbiFn =
357 std::sync::Arc::new(move |pending| {
358 let ctx = ctx_for_executor.clone();
359 Box::pin(async move {
360 stream_callback_host_operation_handler(ctx, pending).await
361 })
362 });
363 let mut guard = workload_dispatch.lock().await;
364 guard
365 .dispatch_data_stream(chunk, sender, invocation, &call_executor)
366 .await
367 })
368 })
369 .await
370 {
371 Ok(()) => HostOperationResult::Done,
372 Err(e) => {
373 tracing::error!("register_stream failed: {e:?}");
374 HostOperationResult::Error(actr_error_to_code(&e))
375 }
376 }
377 }
378
379 HostOperation::UnregisterStream(req) => match ctx.unregister_stream(&req.stream_id).await {
380 Ok(()) => HostOperationResult::Done,
381 Err(e) => {
382 tracing::error!("unregister_stream failed: {e:?}");
383 HostOperationResult::Error(actr_error_to_code(&e))
384 }
385 },
386
387 HostOperation::SendDataStream(req) => {
388 let dest = match decode_dest(&req.dest) {
389 Some(d) => d,
390 None => {
391 tracing::error!("send_data_stream: dest decode failed");
392 return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
393 }
394 };
395 let payload_type = match PayloadType::try_from(req.payload_type) {
396 Ok(PayloadType::StreamReliable | PayloadType::StreamLatencyFirst) => {
397 PayloadType::try_from(req.payload_type).expect("checked payload type")
398 }
399 Ok(other) => {
400 tracing::error!(?other, "send_data_stream: invalid stream payload type");
401 return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
402 }
403 Err(_) => {
404 tracing::error!(
405 payload_type = req.payload_type,
406 "send_data_stream: unknown payload type"
407 );
408 return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
409 }
410 };
411 match ctx.send_data_stream(&dest, req.chunk, payload_type).await {
412 Ok(()) => HostOperationResult::Done,
413 Err(e) => {
414 tracing::error!("send_data_stream failed: {e:?}");
415 HostOperationResult::Error(actr_error_to_code(&e))
416 }
417 }
418 }
419 }
420}
421
422fn lifecycle_invocation(
423 actor_id: &ActrId,
424 request_id: &'static str,
425) -> crate::workload::InvocationContext {
426 crate::workload::InvocationContext {
427 self_id: actor_id.clone(),
428 caller_id: None,
429 request_id: request_id.to_string(),
430 }
431}
432
433pub(crate) fn lifecycle_host_abi(
434 ctx: crate::context::RuntimeContext,
435 workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
436) -> crate::workload::HostAbiFn {
437 std::sync::Arc::new(move |pending| {
438 let ctx = ctx.clone();
439 let workload_dispatch = workload_dispatch.clone();
440 Box::pin(async move { host_operation_handler(ctx, workload_dispatch, pending).await })
441 })
442}
443
444async fn stream_callback_host_operation_handler(
445 ctx: crate::context::RuntimeContext,
446 pending: crate::workload::HostOperation,
447) -> crate::workload::HostOperationResult {
448 use crate::workload::{HostOperation, HostOperationResult, decode_dest};
449 use actr_framework::guest::dynclib_abi::code as abi_code;
450 use actr_framework::{Context as _, Dest};
451 use actr_protocol::PayloadType;
452
453 fn actr_error_to_code(err: &ActrError) -> i32 {
454 match err {
455 ActrError::DecodeFailure(_) | ActrError::InvalidArgument(_) => abi_code::PROTOCOL_ERROR,
456 _ => abi_code::GENERIC_ERROR,
457 }
458 }
459
460 match pending {
461 HostOperation::CallRaw(req) => {
462 match ctx
463 .call_raw(
464 &Dest::Actor(req.target),
465 req.route_key,
466 PayloadType::RpcReliable,
467 bytes::Bytes::from(req.payload),
468 30_000,
469 )
470 .await
471 {
472 Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
473 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
474 }
475 }
476 HostOperation::Call(req) => {
477 let dest = match decode_dest(&req.dest) {
478 Some(d) => d,
479 None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
480 };
481 match ctx
482 .call_raw(
483 &dest,
484 req.route_key,
485 PayloadType::RpcReliable,
486 bytes::Bytes::from(req.payload),
487 30_000,
488 )
489 .await
490 {
491 Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
492 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
493 }
494 }
495 HostOperation::Tell(req) => {
496 let dest = match decode_dest(&req.dest) {
497 Some(d) => d,
498 None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
499 };
500 match ctx
501 .tell_raw(
502 &dest,
503 req.route_key,
504 PayloadType::RpcReliable,
505 bytes::Bytes::from(req.payload),
506 )
507 .await
508 {
509 Ok(()) => HostOperationResult::Done,
510 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
511 }
512 }
513 HostOperation::Discover(req) => {
514 match ctx.discover_route_candidate(&req.target_type).await {
515 Ok(id) => HostOperationResult::Bytes(id.encode_to_vec()),
516 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
517 }
518 }
519 HostOperation::RegisterStream(_) => {
520 tracing::error!("register_stream from inside a stream callback is not supported");
521 HostOperationResult::Error(abi_code::UNSUPPORTED_OP)
522 }
523 HostOperation::UnregisterStream(req) => match ctx.unregister_stream(&req.stream_id).await {
524 Ok(()) => HostOperationResult::Done,
525 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
526 },
527 HostOperation::SendDataStream(req) => {
528 let dest = match decode_dest(&req.dest) {
529 Some(d) => d,
530 None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
531 };
532 let payload_type = match PayloadType::try_from(req.payload_type) {
533 Ok(PayloadType::StreamReliable | PayloadType::StreamLatencyFirst) => {
534 PayloadType::try_from(req.payload_type).expect("checked payload type")
535 }
536 Ok(_) | Err(_) => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
537 };
538 match ctx.send_data_stream(&dest, req.chunk, payload_type).await {
539 Ok(()) => HostOperationResult::Done,
540 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
541 }
542 }
543 }
544}
545
546/// Map `ActrError` to a stable, unique numeric code for the wire
547/// `ErrorResponse`. Each variant has a distinct code so
548/// `wire_code_to_actr_error` can reconstruct the exact variant on the
549/// receiving side.
550///
551/// Code allocation (100xx namespace to avoid collisions with HTTP):
552///
553/// | code | variant |
554/// |-------|--------------------|
555/// | 10001 | Unavailable |
556/// | 10002 | TimedOut |
557/// | 10003 | NotFound |
558/// | 10004 | PermissionDenied |
559/// | 10005 | InvalidArgument |
560/// | 10006 | UnknownRoute |
561/// | 10007 | DependencyNotFound |
562/// | 10008 | DecodeFailure |
563/// | 10009 | NotImplemented |
564/// | 10010 | Internal |
565/// | 10011 | ConnectionNotReady |
566pub(crate) fn protocol_error_to_code(err: &ActrError) -> u32 {
567 match err {
568 ActrError::Unavailable(_) => 10001,
569 ActrError::ConnectionNotReady(_) => 10011,
570 ActrError::TimedOut => 10002,
571 ActrError::NotFound(_) => 10003,
572 ActrError::PermissionDenied(_) => 10004,
573 ActrError::InvalidArgument(_) => 10005,
574 ActrError::UnknownRoute(_) => 10006,
575 ActrError::DependencyNotFound { .. } => 10007,
576 ActrError::DecodeFailure(_) => 10008,
577 ActrError::NotImplemented(_) => 10009,
578 ActrError::Internal(_) => 10010,
579 }
580}
581
582/// Reconstruct an `ActrError` from a wire code + message pair.
583///
584/// This is the inverse of `protocol_error_to_code`. Unknown codes fall
585/// back to `ActrError::Unavailable` to avoid silent data loss.
586pub(crate) fn wire_code_to_actr_error(code: u32, message: String) -> ActrError {
587 match code {
588 10001 => ActrError::Unavailable(message),
589 10002 => ActrError::TimedOut,
590 10003 => ActrError::NotFound(message),
591 10004 => ActrError::PermissionDenied(message),
592 10005 => ActrError::InvalidArgument(message),
593 10006 => ActrError::UnknownRoute(message),
594 10007 => ActrError::DependencyNotFound {
595 service_name: String::new(),
596 message,
597 },
598 10008 => ActrError::DecodeFailure(message),
599 10009 => ActrError::NotImplemented(message),
600 10010 => ActrError::Internal(message),
601 10011 => ActrError::ConnectionNotReady(ConnectionNotReadyInfo {
602 retry_after_ms: parse_connection_not_ready_retry_hint(&message),
603 }),
604 // Legacy HTTP-ish codes emitted before this scheme was introduced,
605 // or any unknown future code: treat as Unavailable.
606 _ => ActrError::Unavailable(format!("rpc error {code}: {message}")),
607 }
608}
609
610fn parse_connection_not_ready_retry_hint(message: &str) -> Option<u64> {
611 // ErrorResponse currently carries only a code and display string. Keep this
612 // parser narrow and best-effort until the wire error payload grows typed details.
613 let marker = "retry_after_ms=Some(";
614 let start = message.find(marker)? + marker.len();
615 let rest = &message[start..];
616 let end = rest.find(')')?;
617 rest[..end].parse().ok()
618}
619
620impl Inner {
621 #[allow(dead_code)]
622 pub(crate) fn package_manifest(&self) -> Option<&actr_pack::PackageManifest> {
623 self.package_manifest.as_ref()
624 }
625
626 /// Network event processing loop (background task)
627 ///
628 /// # Responsibilities
629 /// - Receive network events from Channel
630 /// - Delegate to NetworkEventProcessor for handling
631 /// - Record processing time and send results
632 async fn network_event_loop(
633 event_rx: tokio::sync::mpsc::Receiver<crate::lifecycle::network_event::NetworkEventRequest>,
634 event_processor: Arc<dyn crate::lifecycle::network_event::NetworkEventProcessor>,
635 shutdown_token: CancellationToken,
636 ) {
637 crate::lifecycle::network_event::run_network_event_reconciler(
638 event_rx,
639 event_processor,
640 shutdown_token,
641 )
642 .await;
643 }
644
645 fn duplicate_wait_timeout(timeout_ms: i64) -> Duration {
646 if timeout_ms > 0 {
647 Duration::from_millis(timeout_ms as u64)
648 } else {
649 DEDUP_TTL
650 }
651 }
652
653 async fn wait_for_inflight_duplicate(
654 mut waiter: DedupWaiter,
655 timeout: Duration,
656 ) -> ActorResult<Bytes> {
657 let wait_for_result = async {
658 loop {
659 if let Some(result) = waiter.borrow().clone() {
660 return result;
661 }
662
663 if waiter.changed().await.is_err() {
664 if let Some(result) = waiter.borrow().clone() {
665 return result;
666 }
667 return Err(ActrError::Unavailable(
668 "duplicate request result unavailable".to_string(),
669 ));
670 }
671 }
672 };
673
674 match tokio::time::timeout(timeout, wait_for_result).await {
675 Ok(result) => result,
676 Err(_) => Err(ActrError::Unavailable(format!(
677 "duplicate request in-flight timed out after {}ms",
678 timeout.as_millis()
679 ))),
680 }
681 }
682
683 /// - Single-hop calls: effectively identical
684 /// - Multi-hop calls: trace_id spans all hops, request_id per hop
685 #[cfg_attr(
686 feature = "opentelemetry",
687 tracing::instrument(
688 skip_all,
689 name = "ActrNode.handle_incoming",
690 fields(
691 actr_id = %self.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default(),
692 route_key = %envelope.route_key,
693 request_id = %envelope.request_id,
694 )
695 )
696 )]
697 pub async fn handle_incoming(
698 &self,
699 envelope: RpcEnvelope,
700 caller_id: Option<&ActrId>,
701 ) -> ActorResult<Bytes> {
702 // Log received message
703 if let Some(caller) = caller_id {
704 tracing::debug!(
705 "📨 Handling incoming message: route_key={}, caller={}, request_id={}",
706 envelope.route_key,
707 caller,
708 envelope.request_id
709 );
710 } else {
711 tracing::debug!(
712 "📨 Handling incoming message: route_key={}, request_id={}",
713 envelope.route_key,
714 envelope.request_id
715 );
716 }
717
718 // 0. Get actor_id early for ACL check
719 let actor_id = self.actor_id.as_ref().ok_or_else(|| {
720 ActrError::Internal(
721 "Actor ID not set - node must be started before handling messages".to_string(),
722 )
723 })?;
724
725 // 0.1. ACL Permission Check (before processing message)
726 let acl_allowed = check_acl_permission(caller_id, actor_id, self.config.acl.as_ref())
727 .map_err(|err_msg| ActrError::Internal(format!("ACL check failed: {}", err_msg)))?;
728
729 if !acl_allowed {
730 tracing::warn!(
731 severity = 5,
732 error_category = "acl_denied",
733 request_id = %envelope.request_id,
734 route_key = %envelope.route_key,
735 caller = %caller_id
736 .map(|c| c.to_string())
737 .unwrap_or_else(|| "<none>".to_string()),
738 "🚫 ACL: Permission denied"
739 );
740
741 return Err(ActrError::PermissionDenied(format!(
742 "ACL denied: {} is not allowed to call {}",
743 caller_id
744 .map(|c| c.to_string())
745 .unwrap_or_else(|| "<unknown>".to_string()),
746 actor_id
747 )));
748 }
749
750 // 0.2. Deduplication: return cached response for retried request_ids
751 let outcome = {
752 self.dedup_state
753 .lock()
754 .await
755 .check_or_mark(&envelope.request_id)
756 };
757 match outcome {
758 DedupOutcome::Fresh => {} // proceed normally
759 DedupOutcome::InFlight(waiter) => {
760 tracing::debug!(
761 request_id = %envelope.request_id,
762 route_key = %envelope.route_key,
763 "duplicate request in-flight; waiting for original result"
764 );
765 return Self::wait_for_inflight_duplicate(
766 waiter,
767 Self::duplicate_wait_timeout(envelope.timeout_ms),
768 )
769 .await;
770 }
771 DedupOutcome::Duplicate(cached) => {
772 tracing::debug!(
773 request_id = %envelope.request_id,
774 route_key = %envelope.route_key,
775 "♻️ returning cached response for duplicate request_id"
776 );
777 return cached;
778 }
779 }
780
781 // 1. Create Context with caller_id from transport layer
782 let credential_state = self.credential_state.clone().ok_or_else(|| {
783 ActrError::Internal(
784 "Credential not set - node must be started before handling messages".to_string(),
785 )
786 })?;
787 let ctx = self.make_runtime_context(
788 actor_id,
789 caller_id, // caller_id from transport layer (MessageRecord.from)
790 &envelope.request_id,
791 &credential_state.credential().await,
792 );
793
794 // 2. Dispatch
795 let dispatch_ctx = crate::workload::InvocationContext {
796 self_id: actor_id.clone(),
797 caller_id: caller_id.cloned(),
798 request_id: envelope.request_id.clone(),
799 };
800 let ctx_for_executor = ctx.clone();
801 let workload_for_executor = self.workload_dispatch.clone();
802 let call_executor: crate::workload::HostAbiFn = std::sync::Arc::new(move |pending| {
803 let ctx = ctx_for_executor.clone();
804 let workload_dispatch = workload_for_executor.clone();
805 Box::pin(async move { host_operation_handler(ctx, workload_dispatch, pending).await })
806 });
807
808 let mut guard = self.workload_dispatch.lock().await;
809 let result = guard
810 .dispatch_envelope(envelope.clone(), ctx.clone(), dispatch_ctx, &call_executor)
811 .await;
812
813 match &result {
814 Ok(_) => tracing::debug!(
815 request_id = %envelope.request_id,
816 route_key = %envelope.route_key,
817 "✅ Message handled successfully"
818 ),
819 Err(e) => tracing::error!(
820 severity = 6,
821 error_category = "handler_error",
822 request_id = %envelope.request_id,
823 route_key = %envelope.route_key,
824 "❌ Message handling failed: {:?}", e
825 ),
826 }
827
828 // 3. Store completed result in dedup cache before returning
829 self.dedup_state
830 .lock()
831 .await
832 .complete(&envelope.request_id, result.clone());
833
834 result
835 }
836
837 /// Build a new `Inner` from config and runtime workload.
838 ///
839 /// This is the internal constructor behind the public node builders and
840 /// Hyper package attach helpers.
841 pub(crate) async fn build(
842 config: actr_config::RuntimeConfig,
843 workload: crate::workload::Workload,
844 package_manifest: Option<actr_pack::PackageManifest>,
845 packaged_lock: Option<actr_config::lock::LockFile>,
846 mailbox_backpressure_threshold: usize,
847 credential_expiry_warning: Duration,
848 ) -> ActorResult<Self> {
849 use crate::outbound::{Gate, HostGate};
850 use crate::wire::webrtc::{ReconnectConfig, SignalingConfig, WebSocketSignalingClient};
851
852 tracing::info!("🚀 Initializing ActrNode");
853
854 // Initialize Mailbox
855 let mailbox_path = config
856 .mailbox_path
857 .as_ref()
858 .map(|p| p.to_string_lossy().to_string())
859 .unwrap_or_else(|| ":memory:".to_string());
860
861 tracing::info!("📂 Mailbox database path: {}", mailbox_path);
862
863 let mailbox: Arc<dyn actr_runtime_mailbox::Mailbox> = Arc::new(
864 actr_runtime_mailbox::SqliteMailbox::new(&mailbox_path)
865 .await
866 .map_err(|e| {
867 actr_protocol::ActrError::Unavailable(format!("Mailbox init failed: {e}"))
868 })?,
869 );
870
871 // Initialize Dead Letter Queue
872 let dlq_path = if mailbox_path == ":memory:" {
873 ":memory:".to_string()
874 } else {
875 format!("{mailbox_path}.dlq")
876 };
877
878 let dlq: Arc<dyn actr_runtime_mailbox::DeadLetterQueue> = Arc::new(
879 actr_runtime_mailbox::SqliteDeadLetterQueue::new_standalone(&dlq_path)
880 .await
881 .map_err(|e| {
882 actr_protocol::ActrError::Unavailable(format!("DLQ init failed: {e}"))
883 })?,
884 );
885 tracing::info!("✅ Dead Letter Queue initialized");
886
887 // Initialize signaling client
888 let webrtc_role = if config.webrtc.advanced.prefer_answerer() {
889 Some("answer".to_string())
890 } else {
891 None
892 };
893
894 let signaling_config = SignalingConfig {
895 server_url: config.signaling_url.clone(),
896 connection_timeout: 30,
897 heartbeat_interval: 30,
898 reconnect_config: ReconnectConfig::default(),
899 auth_config: None,
900 webrtc_role,
901 };
902
903 let client = Arc::new(WebSocketSignalingClient::new(signaling_config));
904 client.start_reconnect_manager();
905 let signaling_client: Arc<dyn crate::wire::webrtc::SignalingClient> = client;
906
907 // Initialize inproc infrastructure (Shell ↔ Guest)
908 let shell_to_workload = Arc::new(HostTransport::new());
909 let workload_to_shell = Arc::new(HostTransport::new());
910 let inproc_gate = Gate::Host(Arc::new(HostGate::new(shell_to_workload.clone())));
911
912 let data_stream_registry = Arc::new(DataStreamRegistry::new());
913 let media_frame_registry = Arc::new(MediaFrameRegistry::new());
914
915 tracing::info!("✅ Inproc infrastructure initialized (bidirectional Shell ↔ Guest)");
916
917 let actr_lock = if let Some(lock) = packaged_lock {
918 tracing::info!(
919 "📋 Loaded packaged manifest.lock.toml with {} dependencies",
920 lock.dependencies.len()
921 );
922 Some(Arc::new(lock))
923 } else {
924 tracing::warn!(
925 "⚠️ manifest.lock.toml not found in package. Continuing without dependency fingerprints."
926 );
927 None
928 };
929
930 tracing::info!("✅ ActrNode initialized");
931
932 Ok(Self {
933 config,
934 mailbox,
935 dlq,
936 inproc_gate,
937 outproc_gate: None, // Populated in start() once WebRTC / PeerGate is ready.
938 data_stream_registry,
939 media_frame_registry,
940 signaling_client,
941 actor_id: None,
942 credential_state: None,
943 webrtc_coordinator: None,
944 peer_transport: None,
945 webrtc_gate: None,
946 websocket_gate: None,
947 shell_to_workload: Some(shell_to_workload),
948 workload_to_shell: Some(workload_to_shell),
949 shutdown_token: CancellationToken::new(),
950 actr_lock,
951 network_event_rx: None,
952 network_event_debounce_config: None,
953 dedup_state: Arc::new(Mutex::new(DedupState::new())),
954 package_manifest,
955 preregistered_credential: None,
956 discovered_ws_addresses: Arc::new(tokio::sync::RwLock::new(
957 std::collections::HashMap::new(),
958 )),
959 workload_dispatch: Arc::new(Mutex::new(workload)),
960 hook_observer: None,
961 mailbox_backpressure_threshold,
962 credential_expiry_warning,
963 })
964 }
965
966 /// Snapshot the current runtime handles into a `BootstrapContextBuilder`.
967 ///
968 /// The returned builder is cloned into long-lived hook closures and into
969 /// `ActrRefShared` so those paths can materialize bootstrap contexts
970 /// without retaining a reference back to `Inner`. The snapshot freezes
971 /// `outproc_gate` and `actr_lock` at call time — callers that want to
972 /// observe a later-initialized `outproc_gate` must rebuild.
973 pub(crate) fn bootstrap_ctx_builder(&self) -> BootstrapContextBuilder {
974 BootstrapContextBuilder::new(
975 self.inproc_gate.clone(),
976 self.outproc_gate.clone(),
977 self.data_stream_registry.clone(),
978 self.media_frame_registry.clone(),
979 self.signaling_client.clone(),
980 self.actr_lock.clone(),
981 self.discovered_ws_addresses.clone(),
982 )
983 }
984
985 /// Build a `RuntimeContext` for the per-request dispatch path.
986 ///
987 /// Unlike `BootstrapContextBuilder::build_bootstrap`, this carries the
988 /// envelope's caller identity and request id through into the context.
989 pub(crate) fn make_runtime_context(
990 &self,
991 self_id: &ActrId,
992 caller_id: Option<&ActrId>,
993 request_id: &str,
994 credential: &AIdCredential,
995 ) -> RuntimeContext {
996 RuntimeContext::new(
997 self_id.clone(),
998 caller_id.cloned(),
999 request_id.to_string(),
1000 self.inproc_gate.clone(),
1001 self.outproc_gate.clone(),
1002 self.data_stream_registry.clone(),
1003 self.media_frame_registry.clone(),
1004 self.signaling_client.clone(),
1005 credential.clone(),
1006 self.actr_lock.clone(),
1007 self.discovered_ws_addresses.clone(),
1008 )
1009 }
1010
1011 /// Create network event processing infrastructure (called on demand, before `start()`).
1012 ///
1013 /// # Parameters
1014 /// - `debounce_ms`: Debounce window in milliseconds. If 0, no debounce.
1015 ///
1016 /// # Panics
1017 /// Panics if called more than once.
1018 pub fn create_network_event_handle(
1019 &mut self,
1020 debounce_ms: u64,
1021 ) -> crate::lifecycle::NetworkEventHandle {
1022 if self.network_event_rx.is_some() {
1023 panic!("create_network_event_handle() can only be called once");
1024 }
1025
1026 let (event_tx, event_rx) = tokio::sync::mpsc::channel(100);
1027
1028 let debounce_config = if debounce_ms > 0 {
1029 Some(crate::lifecycle::network_event::DebounceConfig {
1030 window: std::time::Duration::from_millis(debounce_ms),
1031 })
1032 } else {
1033 None
1034 };
1035
1036 self.network_event_rx = Some(event_rx);
1037 self.network_event_debounce_config = debounce_config;
1038
1039 tracing::info!(
1040 debounce_ms,
1041 channel_capacity = 100_u64,
1042 "network_event.node.handle_created"
1043 );
1044
1045 crate::lifecycle::NetworkEventHandle::new(event_tx)
1046 }
1047
1048 /// Attach a credential already issued by AIS so that `start()` can skip
1049 /// the signaling registration step.
1050 ///
1051 /// Called by the Hyper layer between `Hyper::register()` and `Hyper::start()`.
1052 pub fn set_preregistered_credential(&mut self, register_ok: register_response::RegisterOk) {
1053 tracing::debug!("Pre-registered credential attached; start() will skip AIS registration");
1054 self.preregistered_credential = Some(register_ok);
1055 }
1056
1057 /// Start the system
1058 pub async fn start(mut self) -> ActorResult<ActrRef> {
1059 tracing::info!("🚀 Starting ActrNode");
1060 tracing::info!("Actr Rust version: {}", env!("CARGO_PKG_VERSION"));
1061
1062 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1063 // 1. Build RegisterRequest
1064 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1065 // Get ActrType from configuration
1066 let actr_type = self.config.actr_type().clone();
1067 tracing::info!("📋 Actor type: {}", actr_type);
1068
1069 // ServiceSpec is derived by the Hyper layer from the verified package
1070 // (see `service_spec::calculate_service_spec_from_package`). The raw
1071 // ActrNode::start() path has no package context and always sends None
1072 // on its own RegisterRequest; callers that need a spec must go
1073 // through `Hyper::register()`.
1074 let service_spec = None;
1075
1076 // If a WebSocket listen port is configured, build the advertised ws:// address
1077 // to register with the signaling server so clients can discover it.
1078 let ws_address = if let Some(port) = self.config.websocket_listen_port {
1079 let host = self
1080 .config
1081 .websocket_advertised_host
1082 .as_deref()
1083 .unwrap_or("127.0.0.1");
1084 Some(format!("ws://{}:{}", host, port))
1085 } else {
1086 None
1087 };
1088
1089 if let Some(ref addr) = ws_address {
1090 tracing::info!(
1091 "📡 Advertising WebSocket address to signaling server: {}",
1092 addr
1093 );
1094 }
1095
1096 let register_request = RegisterRequest {
1097 actr_type: actr_type.clone(),
1098 realm: self.config.realm,
1099 service_spec,
1100 acl: self.config.acl.clone(),
1101 service: None,
1102 ws_address,
1103 auth_mode: Some(RegisterAuthMode::Linked as i32),
1104 ..Default::default()
1105 };
1106
1107 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1108 // 1. Obtain registration info (Hyper pre-injected or AIS HTTP)
1109 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1110 let register_ok = if let Some(injected) = self.preregistered_credential.take() {
1111 tracing::info!(
1112 "Using Hyper pre-injected registration credential; skipping AIS registration"
1113 );
1114 injected
1115 } else {
1116 let ais_endpoint = &self.config.ais_endpoint;
1117 tracing::info!(
1118 ais_endpoint = %ais_endpoint,
1119 "Registering actor with AIS via HTTP"
1120 );
1121 let mut ais = AisClient::new(ais_endpoint);
1122 if let Some(ref secret) = self.config.realm_secret {
1123 ais = ais.with_realm_secret(secret);
1124 }
1125 let resp = ais
1126 .register_linked(register_request.clone())
1127 .await
1128 .map_err(|e| ActrError::Unavailable(format!("AIS registration failed: {e}")))?;
1129 match resp.result {
1130 Some(register_response::Result::Success(ok)) => {
1131 tracing::info!("✅ AIS HTTP registration successful");
1132 ok
1133 }
1134 Some(register_response::Result::Error(error)) => {
1135 tracing::error!(
1136 severity = 10,
1137 error_category = "registration_error",
1138 error_code = error.code,
1139 "❌ AIS registration failed: code={}, message={}",
1140 error.code,
1141 error.message
1142 );
1143 return Err(ActrError::Unavailable(format!(
1144 "AIS registration rejected: {} (code: {})",
1145 error.message, error.code
1146 )));
1147 }
1148 None => {
1149 tracing::error!(
1150 severity = 10,
1151 error_category = "registration_error",
1152 "❌ AIS registration response missing result"
1153 );
1154 return Err(ActrError::Unavailable(
1155 "Invalid AIS registration response: missing result".to_string(),
1156 ));
1157 }
1158 }
1159 };
1160
1161 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1162 // 3. Set credential on signaling client, then connect signaling WS
1163 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1164 // The signaling server requires credential params in the WS URL for
1165 // authentication. We must set actor_id + credential BEFORE connecting
1166 // so that build_url_with_identity() includes them in the query string.
1167 let pre_connect_credential_state = {
1168 let actor_id = register_ok.actr_id.clone();
1169 let credential_state = CredentialState::new(
1170 register_ok.credential.clone(),
1171 register_ok.credential_expires_at,
1172 Some(register_ok.turn_credential.clone()),
1173 );
1174 self.signaling_client.set_actor_id(actor_id).await;
1175 self.signaling_client
1176 .set_credential_state(credential_state.clone())
1177 .await;
1178 credential_state
1179 };
1180
1181 // Install the signaling-side hook callback so that
1182 // SignalingConnectStart / Connected / Disconnected events flow
1183 // through the framework tracing defaults and into a
1184 // user-installed observer. Done BEFORE connect() so the initial
1185 // attempt produces a SignalingConnectStart event.
1186 {
1187 let actor_id = register_ok.actr_id.clone();
1188 let credential_state = pre_connect_credential_state.clone();
1189 // Snapshot at this point — outproc_gate is still None here, so
1190 // signaling-event contexts will carry None for outproc_gate
1191 // (matching the pre-existing behavior prior to B13 refactor).
1192 let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1193 let ctx_builder: crate::lifecycle::hooks::HookContextBuilder = Arc::new(move || {
1194 let snapshot = ctx_builder_snapshot.clone();
1195 let actor_id = actor_id.clone();
1196 let credential_state = credential_state.clone();
1197 Box::pin(async move {
1198 Some(snapshot.build_bootstrap(&actor_id, &credential_state.credential().await))
1199 })
1200 });
1201 let cb = crate::lifecycle::hooks::build_hook_callback(
1202 self.hook_observer.clone(),
1203 ctx_builder,
1204 );
1205 self.signaling_client.set_hook_callback(cb);
1206 }
1207
1208 tracing::info!("📡 Connecting to signaling server (with credential)");
1209 self.signaling_client
1210 .connect()
1211 .await
1212 .map_err(|e| ActrError::Unavailable(format!("Signaling connect failed: {e}")))?;
1213 tracing::info!("✅ Connected to signaling server");
1214
1215 // Collect background task handles so they can be managed by ActrRefShared later.
1216 let mut task_handles = Vec::new();
1217
1218 // Node-level hook callback, built inside the registration
1219 // setup block below and published back out into this wider
1220 // scope so the mailbox backpressure watchdog can subscribe.
1221 let node_hook_callback: Option<crate::wire::webrtc::HookCallback>;
1222
1223 {
1224 let actor_id = register_ok.actr_id;
1225 let credential = register_ok.credential;
1226
1227 tracing::info!("🆔 Assigned ActrId: {}", actor_id);
1228 tracing::info!("🔐 Received credential (key_id: {})", credential.key_id);
1229 tracing::info!(
1230 "💓 Signaling heartbeat interval: {} seconds",
1231 register_ok.signaling_heartbeat_interval_secs
1232 );
1233
1234 // TurnCredential is a required field; should always be present under normal registration.
1235 tracing::debug!("TurnCredential received, TURN authentication ready");
1236
1237 if let Some(expires_at) = ®ister_ok.credential_expires_at {
1238 tracing::debug!("⏰ Credential expires at: {}s", expires_at.seconds);
1239 }
1240
1241 // Store ActrId and credential state
1242 self.actor_id = Some(actor_id.clone());
1243 let credential_state = CredentialState::new(
1244 credential,
1245 register_ok.credential_expires_at,
1246 Some(register_ok.turn_credential.clone()),
1247 );
1248 self.credential_state = Some(credential_state.clone());
1249
1250 // Build the node-level lifecycle hook callback once: it is
1251 // reused for the initial `on_credential_renewed`, handed to
1252 // the heartbeat task for subsequent credential events, and
1253 // handed to the mailbox backpressure watchdog for
1254 // `on_mailbox_backpressure` on rising-edge crossings.
1255 //
1256 // The signaling layer already has its own callback installed
1257 // above — this second callback only carries credential and
1258 // mailbox-backpressure events, so no overlap with the
1259 // signaling-event plumbing.
1260 node_hook_callback =
1261 {
1262 let actor_id_for_hook = actor_id.clone();
1263 let credential_state_for_hook = credential_state.clone();
1264 // Snapshot at this point — outproc_gate is still None
1265 // here; credential / mailbox hook contexts inherit that
1266 // and therefore cannot issue Dest::Actor(_) calls (same
1267 // behavior as before B13 refactor).
1268 let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1269 let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1270 Arc::new(move || {
1271 let snapshot = ctx_builder_snapshot.clone();
1272 let actor_id = actor_id_for_hook.clone();
1273 let credential_state = credential_state_for_hook.clone();
1274 Box::pin(async move {
1275 Some(snapshot.build_bootstrap(
1276 &actor_id,
1277 &credential_state.credential().await,
1278 ))
1279 })
1280 });
1281 Some(crate::lifecycle::hooks::build_hook_callback(
1282 self.hook_observer.clone(),
1283 ctx_builder,
1284 ))
1285 };
1286
1287 // Fire `on_credential_renewed` at initial registration: the
1288 // credential is considered "renewed" from "nothing" to the
1289 // value just issued by AIS. Subsequent renewals fire the
1290 // same hook from `lifecycle::heartbeat`.
1291 if let Some(expires_at) = ®ister_ok.credential_expires_at {
1292 let new_expiry = std::time::UNIX_EPOCH
1293 + std::time::Duration::from_secs(expires_at.seconds.max(0) as u64);
1294 if let Some(cb) = node_hook_callback.as_ref() {
1295 cb(crate::wire::webrtc::HookEvent::CredentialRenewed { new_expiry }).await;
1296 } else {
1297 tracing::info!(new_expiry = ?new_expiry, "credential renewed");
1298 }
1299 }
1300
1301 // Note: actor_id and credential_state were already set on signaling_client
1302 // before connect (step 3 above), so reconnect URLs already carry correct auth.
1303
1304 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1305 // 1.3. Inproc transports were filled in during `build()`; nothing
1306 // to stage here now that ContextFactory has been removed.
1307 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1308 tracing::info!("✅ Inproc infrastructure already ready (created in ActrNode::build())");
1309
1310 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1311 // 1.5. Create WebRTC infrastructure
1312 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1313 tracing::info!("🌐 Initializing WebRTC infrastructure");
1314
1315 let media_frame_registry = self.media_frame_registry.clone();
1316
1317 // Create WebRtcCoordinator
1318 let coordinator = Arc::new(crate::wire::webrtc::WebRtcCoordinator::new(
1319 actor_id.clone(),
1320 credential_state.clone(),
1321 self.signaling_client.clone(),
1322 self.config.webrtc.clone(),
1323 media_frame_registry,
1324 ));
1325
1326 // Install the WebRTC hook callback — fires
1327 // WebRtcConnectStart / Connected (with relayed info) /
1328 // Disconnected HookEvents on every peer state change.
1329 {
1330 let actor_id_for_hook = actor_id.clone();
1331 let credential_state_for_hook = credential_state.clone();
1332 // Snapshot before outproc_gate is wired up (just below). This
1333 // preserves the pre-refactor behavior where WebRTC-event
1334 // hook contexts carry outproc_gate = None.
1335 let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1336 let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1337 Arc::new(move || {
1338 let snapshot = ctx_builder_snapshot.clone();
1339 let actor_id = actor_id_for_hook.clone();
1340 let credential_state = credential_state_for_hook.clone();
1341 Box::pin(async move {
1342 Some(
1343 snapshot.build_bootstrap(
1344 &actor_id,
1345 &credential_state.credential().await,
1346 ),
1347 )
1348 })
1349 });
1350 let cb = crate::lifecycle::hooks::build_hook_callback(
1351 self.hook_observer.clone(),
1352 ctx_builder,
1353 );
1354 coordinator.set_hook_callback(cb);
1355 }
1356
1357 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1358 // 1.6. Create PeerTransport + PeerGate (new architecture)
1359 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1360 tracing::info!("🏗️ Creating PeerTransport with WebRTC support");
1361
1362 // Pre-allocate the pending-requests map so it can be shared between
1363 // DefaultWireBuilder (for outbound WS response reader tasks) and
1364 // PeerGate (for request/response matching).
1365 let pending_requests: crate::outbound::PendingRequestsMap =
1366 Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new()));
1367
1368 // Create DefaultWireBuilder with WebRTC coordinator
1369 use crate::transport::{DefaultWireBuilder, DefaultWireBuilderConfig};
1370
1371 // WebSocket channel always enabled: target ws:// address is fully discovered at runtime
1372 // Direct-connect mode: encode local node ActrId as hex, sent as X-Actr-Node-Id
1373 let local_id_hex = hex::encode(actor_id.encode_to_vec());
1374 let wire_builder_config = DefaultWireBuilderConfig {
1375 local_id_hex,
1376 enable_webrtc: true,
1377 enable_websocket: true,
1378 // Share the discovered_ws_addresses map so that post-discovery calls
1379 // can use the signaling-provided ws:// URL for this actor node.
1380 discovered_ws_addresses: self.discovered_ws_addresses.clone(),
1381 // Pass credential_state so outbound WS handshake carries X-Actr-Credential,
1382 // enabling peer WebSocketGate to perform Ed25519 signature verification.
1383 credential_state: Some(credential_state.clone()),
1384 // Pass pending_requests so outbound WS connections spawn reader tasks
1385 // to deliver server responses back to `send_request_with_type` futures.
1386 pending_requests: Some(pending_requests.clone()),
1387 };
1388 let wire_builder = Arc::new(DefaultWireBuilder::new(
1389 Some(coordinator.clone()),
1390 wire_builder_config,
1391 ));
1392
1393 // Create PeerTransport
1394 use crate::transport::PeerTransport;
1395 let transport_manager = Arc::new(PeerTransport::new(actor_id.clone(), wire_builder));
1396 self.peer_transport = Some(transport_manager.clone());
1397
1398 // Create PeerGate with the pre-allocated pending_requests map and WebRTC coordinator.
1399 use crate::outbound::PeerGate;
1400 let outproc_gate = Arc::new(PeerGate::with_pending_requests(
1401 transport_manager,
1402 Some(coordinator.clone()),
1403 pending_requests.clone(),
1404 ));
1405 let outproc_gate_enum = Gate::Peer(outproc_gate.clone());
1406 tracing::info!("PeerTransport + PeerGate initialized");
1407
1408 let data_stream_registry = self.data_stream_registry.clone();
1409
1410 // Create WebRtcGate with shared pending_requests and DataStreamRegistry
1411 let gate = Arc::new(crate::wire::webrtc::gate::WebRtcGate::new(
1412 coordinator.clone(),
1413 pending_requests,
1414 data_stream_registry.clone(),
1415 ));
1416 // Set local_id
1417 gate.set_local_id(actor_id.clone()).await;
1418 tracing::info!(
1419 "✅ WebRtcGate created with shared pending_requests and DataStreamRegistry"
1420 );
1421
1422 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1423 // 1.7. Wire the outproc gate into Inner so subsequent
1424 // `make_runtime_context` / `bootstrap_ctx_builder` calls
1425 // observe it. All per-request contexts created by
1426 // `handle_incoming` go through this field live.
1427 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1428 tracing::info!("🔧 Wiring outproc_gate into node");
1429 self.outproc_gate = Some(outproc_gate_enum);
1430 tracing::info!("✅ Node runtime gates fully initialized (inproc + outproc)");
1431
1432 // Save references
1433 self.webrtc_coordinator = Some(coordinator.clone());
1434 self.webrtc_gate = Some(gate.clone());
1435 tracing::info!("✅ WebRTC infrastructure initialized");
1436
1437 // Fire `on_start` once the runtime context can see the initialized
1438 // gates, before starting request-accepting/background loops. Its
1439 // Err/panic aborts Node::start.
1440 {
1441 let startup_ctx = self
1442 .bootstrap_ctx_builder()
1443 .build_bootstrap(&actor_id, &credential_state.credential().await);
1444 let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_start");
1445 let call_executor =
1446 lifecycle_host_abi(startup_ctx.clone(), self.workload_dispatch.clone());
1447 let mut workload = self.workload_dispatch.lock().await;
1448 crate::lifecycle::hooks::call_lifecycle_hook(
1449 "on_start",
1450 workload.on_start(startup_ctx, invocation, &call_executor),
1451 )
1452 .await?;
1453 }
1454
1455 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1456 // 1.7.6. WebSocket Server (direct-connect mode, optional)
1457 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1458 if let Some(listen_port) = self.config.websocket_listen_port {
1459 tracing::info!(
1460 "🔌 WebSocket direct-connect mode enabled, binding port {}",
1461 listen_port
1462 );
1463 use crate::key_cache::AisKeyCache;
1464 use crate::wire::websocket::gate::WsAuthContext;
1465 use crate::wire::websocket::{WebSocketGate, WebSocketServer};
1466
1467 // Build AisKeyCache and seed it with the signing key from the registration response
1468 let ais_key_cache = AisKeyCache::new();
1469 if !register_ok.signing_pubkey.is_empty() {
1470 match ais_key_cache
1471 .seed(register_ok.signing_key_id, ®ister_ok.signing_pubkey)
1472 .await
1473 {
1474 Ok(()) => tracing::info!(
1475 key_id = register_ok.signing_key_id,
1476 "🔑 AisKeyCache seeded from RegisterOk"
1477 ),
1478 Err(e) => tracing::warn!(
1479 key_id = register_ok.signing_key_id,
1480 error = ?e,
1481 "AisKeyCache seed failed; WebSocket will reject all inbound connections"
1482 ),
1483 }
1484 } else {
1485 tracing::warn!(
1486 "RegisterOk missing signing_pubkey; WebSocket credential verification will degrade"
1487 );
1488 }
1489
1490 let auth_ctx = WsAuthContext {
1491 ais_key_cache,
1492 actor_id: actor_id.clone(),
1493 credential_state: credential_state.clone(),
1494 signaling_client: self.signaling_client.clone(),
1495 };
1496
1497 match WebSocketServer::bind(listen_port).await {
1498 Ok((ws_server, conn_rx)) => {
1499 ws_server.start(self.shutdown_token.clone());
1500 let ws_gate = Arc::new(WebSocketGate::new(
1501 conn_rx,
1502 outproc_gate.get_pending_requests(),
1503 data_stream_registry.clone(),
1504 Some(auth_ctx),
1505 ));
1506
1507 // Install the WebSocket peer-lifecycle hook.
1508 {
1509 let actor_id_for_hook = actor_id.clone();
1510 let credential_state_for_hook = credential_state.clone();
1511 // Snapshot taken after outproc_gate is live: ws
1512 // peer-lifecycle hook contexts can issue
1513 // Dest::Actor(_) calls.
1514 let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1515 let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1516 Arc::new(move || {
1517 let snapshot = ctx_builder_snapshot.clone();
1518 let actor_id = actor_id_for_hook.clone();
1519 let credential_state = credential_state_for_hook.clone();
1520 Box::pin(async move {
1521 Some(snapshot.build_bootstrap(
1522 &actor_id,
1523 &credential_state.credential().await,
1524 ))
1525 })
1526 });
1527 let cb = crate::lifecycle::hooks::build_hook_callback(
1528 self.hook_observer.clone(),
1529 ctx_builder,
1530 );
1531 ws_gate.set_hook_callback(cb);
1532 }
1533
1534 self.websocket_gate = Some(ws_gate);
1535 tracing::info!(
1536 "✅ WebSocketServer + WebSocketGate initialized (credential auth enabled)"
1537 );
1538 }
1539 Err(e) => {
1540 tracing::error!(
1541 "❌ Failed to bind WebSocket server on port {}: {:?}",
1542 listen_port,
1543 e
1544 );
1545 }
1546 }
1547 }
1548
1549 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1550 // 1.7.5. Create shared state for credential management
1551 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1552 // Shared credential state initialized above; reused across tasks
1553
1554 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1555 // 1.8. Spawn heartbeat task (periodic Ping to signaling server)
1556 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1557 {
1558 let shutdown = self.shutdown_token.clone();
1559 let client = self.signaling_client.clone();
1560 let actor_id_for_heartbeat = actor_id.clone();
1561 let credential_state_for_heartbeat = credential_state.clone();
1562 let mailbox_for_heartbeat = self.mailbox.clone();
1563 let register_request_for_heartbeat = register_request.clone();
1564 let webrtc_coordinator_for_heartbeat = self.webrtc_coordinator.clone();
1565 let webrtc_gate_for_heartbeat = self.webrtc_gate.clone();
1566
1567 // Use interval from registration response, default to 30s
1568 let heartbeat_interval_secs = register_ok.signaling_heartbeat_interval_secs;
1569 let heartbeat_interval = if heartbeat_interval_secs > 0 {
1570 Duration::from_secs(heartbeat_interval_secs as u64)
1571 } else {
1572 Duration::from_secs(30)
1573 };
1574 let ais_endpoint_for_heartbeat = self.config.ais_endpoint.clone();
1575 let realm_secret_for_heartbeat = self.config.realm_secret.clone();
1576 let heartbeat_handle = tokio::spawn(crate::lifecycle::heartbeat::heartbeat_task(
1577 shutdown,
1578 client,
1579 actor_id_for_heartbeat,
1580 credential_state_for_heartbeat,
1581 mailbox_for_heartbeat,
1582 heartbeat_interval,
1583 register_request_for_heartbeat,
1584 ais_endpoint_for_heartbeat,
1585 realm_secret_for_heartbeat,
1586 node_hook_callback.clone(),
1587 webrtc_coordinator_for_heartbeat,
1588 webrtc_gate_for_heartbeat,
1589 ));
1590 task_handles.push(heartbeat_handle);
1591 }
1592 tracing::info!(
1593 "✅ Heartbeat task started (interval: {}s)",
1594 register_ok.signaling_heartbeat_interval_secs
1595 );
1596
1597 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1598 // 1.8.5. Spawn network event processing loop
1599 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1600 if let Some(event_rx) = self.network_event_rx.take() {
1601 use crate::lifecycle::network_event::DefaultNetworkEventProcessor;
1602
1603 // Create DefaultNetworkEventProcessor
1604 // If debounce config exists, use new_with_debounce
1605 let event_processor =
1606 if let Some(config) = self.network_event_debounce_config.clone() {
1607 Arc::new(
1608 DefaultNetworkEventProcessor::new_with_debounce_and_peer_transport(
1609 self.signaling_client.clone(),
1610 self.webrtc_coordinator.clone(),
1611 config,
1612 self.peer_transport.clone(),
1613 ),
1614 )
1615 } else {
1616 Arc::new(DefaultNetworkEventProcessor::new_with_peer_transport(
1617 self.signaling_client.clone(),
1618 self.webrtc_coordinator.clone(),
1619 self.peer_transport.clone(),
1620 ))
1621 };
1622
1623 let shutdown = self.shutdown_token.clone();
1624 let network_event_handle = tokio::spawn(async move {
1625 Self::network_event_loop(event_rx, event_processor, shutdown).await;
1626 });
1627 task_handles.push(network_event_handle);
1628 tracing::info!("network_event.node.loop_started");
1629 } else {
1630 tracing::debug!("network_event.node.loop_not_started_no_handle");
1631 }
1632
1633 {
1634 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1635 // 1.9. Spawn dedicated Unregister task (best-effort, with timeout)
1636 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1637 //
1638 // This task:
1639 // - Waits for shutdown_token to be cancelled (e.g., wait_for_ctrl_c_and_shutdown)
1640 // - Then sends UnregisterRequest via signaling client with a timeout
1641 //
1642 // NOTE: we push its JoinHandle into task_handles so it can be aborted
1643 // by ActrRefShared::Drop if needed.
1644 let shutdown = self.shutdown_token.clone();
1645 let client = self.signaling_client.clone();
1646 let actor_id_for_unreg = actor_id.clone();
1647 let credential_state_for_unreg = credential_state.clone();
1648 let webrtc_coordinator = self.webrtc_coordinator.clone();
1649
1650 let unregister_handle = tokio::spawn(async move {
1651 // Wait for shutdown signal
1652 shutdown.cancelled().await;
1653 tracing::info!(
1654 "📡 Shutdown signal received, sending UnregisterRequest for Actor {}",
1655 actor_id_for_unreg
1656 );
1657
1658 // 1. Close all WebRTC peer connections first (if any)
1659 if let Some(coord) = webrtc_coordinator {
1660 if let Err(e) = coord.close_all_peers().await {
1661 tracing::warn!(
1662 "⚠️ Failed to close all WebRTC peers before UnregisterRequest: {}",
1663 e
1664 );
1665 } else {
1666 tracing::info!("✅ All WebRTC peers closed before UnregisterRequest");
1667 }
1668 } else {
1669 tracing::debug!(
1670 "WebRTC coordinator not found before UnregisterRequest (no WebRTC?)"
1671 );
1672 }
1673
1674 // 2. Then send UnregisterRequest with a timeout (e.g. 5 seconds)
1675 let result = tokio::time::timeout(
1676 Duration::from_secs(5),
1677 client.send_unregister_request(
1678 actor_id_for_unreg.clone(),
1679 credential_state_for_unreg.credential().await,
1680 Some("Graceful shutdown".to_string()),
1681 ),
1682 )
1683 .await;
1684 tracing::info!("UnregisterRequest result: {:?}", result);
1685 match result {
1686 Ok(Ok(_)) => {
1687 tracing::info!(
1688 "✅ UnregisterRequest sent to signaling server for Actor {}",
1689 actor_id_for_unreg
1690 );
1691 }
1692 Ok(Err(e)) => {
1693 tracing::warn!(
1694 "⚠️ Failed to send UnregisterRequest for Actor {}: {}",
1695 actor_id_for_unreg,
1696 e
1697 );
1698 }
1699 Err(_) => {
1700 tracing::warn!(
1701 "⚠️ UnregisterRequest timeout (5s) for Actor {}",
1702 actor_id_for_unreg
1703 );
1704 }
1705 }
1706 });
1707
1708 task_handles.push(unregister_handle);
1709 }
1710 } // end registration setup block
1711
1712 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1713 // 2. Transport layer initialization (completed via WebRTC infrastructure)
1714 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1715 tracing::info!("✅ Transport layer initialized via WebRTC infrastructure");
1716
1717 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1718 // 3.1 Convert to Arc (before starting background loops)
1719 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1720 // Clone actor_id before moving self into Arc
1721 let actor_id = self
1722 .actor_id
1723 .as_ref()
1724 .ok_or_else(|| ActrError::Internal("Actor ID not set".to_string()))?
1725 .clone();
1726 // Snapshot now that outproc_gate has been wired above; this builder
1727 // is shared between on_start / on_stop hooks and the ActrRefShared
1728 // handle returned to the caller.
1729 let bootstrap_ctx_builder = self.bootstrap_ctx_builder();
1730 let credential_state = self
1731 .credential_state
1732 .clone()
1733 .expect("CredentialState must be initialized in start()");
1734 let shutdown_token = self.shutdown_token.clone();
1735 let node_ref = Arc::new(self);
1736
1737 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1738 // 3.2. Register workload-level stop hook.
1739 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1740 {
1741 let node = node_ref.clone();
1742 let actor_id = actor_id.clone();
1743 let credential_state = credential_state.clone();
1744 let shutdown = shutdown_token.clone();
1745 let on_stop_handle = tokio::spawn(async move {
1746 shutdown.cancelled().await;
1747 let stop_ctx = node
1748 .bootstrap_ctx_builder()
1749 .build_bootstrap(&actor_id, &credential_state.credential().await);
1750 let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_stop");
1751 let call_executor =
1752 lifecycle_host_abi(stop_ctx.clone(), node.workload_dispatch.clone());
1753 let mut workload = node.workload_dispatch.lock().await;
1754 if let Err(e) = crate::lifecycle::hooks::call_lifecycle_hook(
1755 "on_stop",
1756 workload.on_stop(stop_ctx, invocation, &call_executor),
1757 )
1758 .await
1759 {
1760 tracing::warn!(error = %e, "workload on_stop returned Err");
1761 }
1762 });
1763 task_handles.push(on_stop_handle);
1764 }
1765
1766 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1767 // 3.5. Start WebRTC background loops
1768 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1769 tracing::info!("🚀 Starting WebRTC background loops");
1770
1771 // Start WebRtcCoordinator signaling loop
1772 if let Some(coordinator) = &node_ref.webrtc_coordinator {
1773 coordinator.clone().start().await.map_err(|e| {
1774 ActrError::Unavailable(format!("WebRtcCoordinator start failed: {e}"))
1775 })?;
1776 tracing::info!("✅ WebRtcCoordinator signaling loop started");
1777 }
1778
1779 // Start WebRtcGate message receive loop (route to Mailbox)
1780 if let Some(gate) = &node_ref.webrtc_gate {
1781 gate.start_receive_loop(node_ref.mailbox.clone())
1782 .await
1783 .map_err(|e| {
1784 ActrError::Unavailable(format!("WebRtcGate receive loop start failed: {e}"))
1785 })?;
1786 tracing::info!("✅ WebRtcGate → Mailbox routing started");
1787 }
1788
1789 // Start WebSocketGate message receive loop (route to Mailbox, direct-connect mode)
1790 if let Some(ws_gate) = &node_ref.websocket_gate {
1791 ws_gate
1792 .start_receive_loop(node_ref.mailbox.clone())
1793 .await
1794 .map_err(|e| {
1795 ActrError::Unavailable(format!("WebSocketGate receive loop start failed: {e}"))
1796 })?;
1797 tracing::info!("✅ WebSocketGate → Mailbox routing started");
1798 }
1799 tracing::info!("✅ WebRTC background loops started");
1800
1801 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1802 // 4.6. Start Inproc receive loop (Shell → Guest)
1803 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1804 if let Some(shell_to_workload) = &node_ref.shell_to_workload {
1805 tracing::info!("🔄 Starting Inproc receive loop (Shell → Guest)");
1806 // Start Guest receive loop (Shell → Guest REQUEST)
1807 if let Some(workload_to_shell) = &node_ref.workload_to_shell {
1808 let node = node_ref.clone();
1809 let request_rx_lane = shell_to_workload
1810 .get_lane(PayloadType::RpcReliable, None)
1811 .await
1812 .map_err(|e| {
1813 ActrError::Unavailable(format!("Failed to get guest receive lane: {e}"))
1814 })?;
1815 let response_tx = workload_to_shell.clone();
1816 let shutdown = shutdown_token.clone();
1817
1818 let inproc_handle = tokio::spawn(async move {
1819 loop {
1820 tokio::select! {
1821 _ = shutdown.cancelled() => {
1822 tracing::info!("📭 Guest receive loop (Shell → Guest) received shutdown signal");
1823 break;
1824 }
1825 envelope_result = request_rx_lane.recv_envelope() => {
1826 match envelope_result {
1827 Ok(envelope) => {
1828 let request_id = envelope.request_id.clone();
1829 tracing::debug!("📨 Guest received REQUEST from Shell: request_id={}", request_id);
1830 // Extract and set tracing context from envelope
1831 #[cfg(feature = "opentelemetry")]
1832 let span = {
1833 let actr_id_str = node.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default();
1834 let span = tracing::info_span!("ActrNode.lane_receive", actr_id = %actr_id_str, request_id = %request_id);
1835 set_parent_from_rpc_envelope(&span, &envelope);
1836 span
1837 };
1838
1839 // Shell calls have no caller_id (local process communication)
1840 let handle_incoming_fut = node.handle_incoming(envelope.clone(), None);
1841 #[cfg(feature = "opentelemetry")]
1842 let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
1843
1844 match handle_incoming_fut.await {
1845 Ok(response_bytes) => {
1846 // Send RESPONSE back via workload_to_shell
1847 // Keep same route_key (no prefix needed - separate channels!)
1848 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1849 let mut response_envelope = RpcEnvelope {
1850 route_key: envelope.route_key.clone(),
1851 payload: Some(response_bytes),
1852 error: None,
1853 traceparent: None,
1854 tracestate: None,
1855 request_id: request_id.clone(),
1856 metadata: Vec::new(),
1857 timeout_ms: 30000,
1858 };
1859 // Inject tracing context
1860 #[cfg(feature = "opentelemetry")]
1861 inject_span_context_to_rpc(&span, &mut response_envelope);
1862
1863 // Send via Guest → Shell channel
1864 let send_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, response_envelope);
1865 #[cfg(feature = "opentelemetry")]
1866 let send_response_fut = send_response_fut.instrument(span.clone());
1867 if let Err(e) = send_response_fut.await {
1868 tracing::error!(
1869 severity = 7,
1870 error_category = "transport_error",
1871 request_id = %request_id,
1872 "❌ Failed to send RESPONSE to Shell: {:?}",
1873 e
1874 );
1875 }
1876 }
1877 Err(e) => {
1878 tracing::error!(
1879 severity = 6,
1880 error_category = "handler_error",
1881 request_id = %request_id,
1882 route_key = %envelope.route_key,
1883 "❌ Guest message handling failed: {:?}",
1884 e
1885 );
1886
1887 // Send error response (system-level error on envelope)
1888 let error_response = actr_protocol::ErrorResponse {
1889 code: protocol_error_to_code(&e),
1890 message: e.to_string(),
1891 };
1892 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1893 let mut error_envelope = RpcEnvelope {
1894 route_key: envelope.route_key.clone(),
1895 payload: None,
1896 error: Some(error_response),
1897 traceparent: envelope.traceparent.clone(),
1898 tracestate: envelope.tracestate.clone(),
1899 request_id: request_id.clone(),
1900 metadata: Vec::new(),
1901 timeout_ms: 30000,
1902 };
1903 // Inject tracing context
1904 #[cfg(feature = "opentelemetry")]
1905 inject_span_context_to_rpc(&span, &mut error_envelope);
1906
1907 let send_error_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, error_envelope);
1908 #[cfg(feature = "opentelemetry")]
1909 let send_error_response_fut = send_error_response_fut.instrument(span);
1910 if let Err(send_err) = send_error_response_fut.await {
1911 tracing::error!(
1912 severity = 7,
1913 error_category = "transport_error",
1914 request_id = %request_id,
1915 "❌ Failed to send ERROR response to Shell: {:?}",
1916 send_err
1917 );
1918 }
1919 }
1920 }
1921 }
1922 Err(e) => {
1923 tracing::error!(
1924 severity = 8,
1925 error_category = "transport_error",
1926 "❌ Failed to receive from Shell → Guest lane: {:?}",
1927 e
1928 );
1929 break;
1930 }
1931 }
1932 }
1933 }
1934 }
1935 tracing::info!("✅ Guest receive loop (Shell → Guest) terminated gracefully");
1936 });
1937 task_handles.push(inproc_handle);
1938 }
1939 }
1940 tracing::info!("✅ Guest receive loop (Shell → Guest REQUEST) started");
1941
1942 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1943 // 4.7. Start Shell receive loop (Guest → Shell RESPONSE)
1944 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1945 tracing::info!("🔄 Starting Shell receive loop (Guest → Shell RESPONSE)");
1946 if let Some(workload_to_shell) = &node_ref.workload_to_shell {
1947 // Start Shell receive loop (Guest → Shell RESPONSE)
1948 if let Some(shell_to_workload) = &node_ref.shell_to_workload {
1949 let response_rx_lane = workload_to_shell
1950 .get_lane(PayloadType::RpcReliable, None)
1951 .await
1952 .map_err(|e| {
1953 ActrError::Unavailable(format!("Failed to get shell receive lane: {e}"))
1954 })?;
1955 let request_mgr = shell_to_workload.clone();
1956 let shutdown = shutdown_token.clone();
1957
1958 let shell_receive_handle = tokio::spawn(async move {
1959 loop {
1960 tokio::select! {
1961 _ = shutdown.cancelled() => {
1962 tracing::info!("📭 Shell receive loop (Guest → Shell) received shutdown signal");
1963 break;
1964 }
1965 envelope_result = response_rx_lane.recv_envelope() => {
1966 match envelope_result {
1967 Ok(envelope) => {
1968 tracing::debug!(
1969 "📨 Shell received RESPONSE from Guest: request_id={}",
1970 envelope.request_id
1971 );
1972
1973 // Check if response is success or error
1974 match (envelope.payload, envelope.error) {
1975 (Some(payload), None) => {
1976 // Success response
1977 if let Err(e) = request_mgr
1978 .complete_response(&envelope.request_id, payload)
1979 .await
1980 {
1981 tracing::warn!(
1982 severity = 4,
1983 error_category = "orphan_response",
1984 request_id = %envelope.request_id,
1985 "⚠️ No pending request found for response: {:?}",
1986 e
1987 );
1988 }
1989 }
1990 (None, Some(error)) => {
1991 // Error response — reconstruct the precise ActrError variant
1992 // from the wire code so binding-visible classification
1993 // (UnknownRoute / PermissionDenied / TimedOut / …) is preserved
1994 // instead of collapsing every error into Unavailable.
1995 let actr_err = wire_code_to_actr_error(error.code, error.message);
1996 if let Err(e) = request_mgr
1997 .complete_error(&envelope.request_id, actr_err)
1998 .await
1999 {
2000 tracing::warn!(
2001 severity = 4,
2002 error_category = "orphan_response",
2003 request_id = %envelope.request_id,
2004 "⚠️ No pending request found for error response: {:?}",
2005 e
2006 );
2007 }
2008 }
2009 _ => {
2010 tracing::error!(
2011 severity = 7,
2012 error_category = "protocol_error",
2013 request_id = %envelope.request_id,
2014 "❌ Invalid RpcEnvelope: both payload and error are present or both absent"
2015 );
2016 }
2017 }
2018 }
2019 Err(e) => {
2020 tracing::error!(
2021 severity = 8,
2022 error_category = "transport_error",
2023 "❌ Failed to receive from Guest → Shell lane: {:?}",
2024 e
2025 );
2026 break;
2027 }
2028 }
2029 }
2030 }
2031 }
2032 tracing::info!("✅ Shell receive loop (Guest → Shell) terminated gracefully");
2033 });
2034 task_handles.push(shell_receive_handle);
2035 }
2036 }
2037 tracing::info!("✅ Shell receive loop (Guest → Shell RESPONSE) started");
2038
2039 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2040 // 4.9. Mailbox backpressure watchdog
2041 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2042 //
2043 // Emits the framework `on_mailbox_backpressure` hook once per
2044 // rising-edge crossing of the configured threshold.
2045 //
2046 // Preferred path: a push-based notification from the mailbox
2047 // backend via [`Mailbox::set_depth_observer`], which runs
2048 // synchronously on every enqueue and has zero worst-case delay.
2049 //
2050 // Fallback path: mailbox backends without depth support (or
2051 // which can't cheaply compute depth on every enqueue) keep
2052 // using a 1 Hz poll of [`Mailbox::status`].
2053 let backpressure_threshold = node_ref.mailbox_backpressure_threshold;
2054 {
2055 use std::sync::atomic::{AtomicBool, Ordering};
2056 let mailbox = node_ref.mailbox.clone();
2057 let shutdown = shutdown_token.clone();
2058 let hook_cb = node_hook_callback.clone();
2059 let triggered = Arc::new(AtomicBool::new(false));
2060
2061 // Shared rising-edge state + hook-firing closure used by
2062 // both the push and polling code paths.
2063 let fire_if_rising = {
2064 let triggered = triggered.clone();
2065 let hook_cb = hook_cb.clone();
2066 Arc::new(move |queue_len: usize| {
2067 if queue_len >= backpressure_threshold {
2068 if !triggered.swap(true, Ordering::AcqRel) {
2069 if let Some(cb) = hook_cb.as_ref() {
2070 let cb = cb.clone();
2071 tokio::spawn(async move {
2072 cb(crate::wire::webrtc::HookEvent::MailboxBackpressure {
2073 queue_len,
2074 threshold: backpressure_threshold,
2075 })
2076 .await;
2077 });
2078 } else {
2079 tracing::warn!(
2080 queue_len,
2081 threshold = backpressure_threshold,
2082 "mailbox backpressure",
2083 );
2084 }
2085 }
2086 } else if triggered.swap(false, Ordering::AcqRel) {
2087 tracing::info!(
2088 queue_len,
2089 threshold = backpressure_threshold,
2090 "mailbox backpressure cleared",
2091 );
2092 }
2093 })
2094 };
2095
2096 // Try the push path first. The observer installs only if
2097 // the backend supports it; otherwise `installed` is `false`
2098 // and we fall through to polling.
2099 struct EnqueueObserver {
2100 fire: Arc<dyn Fn(usize) + Send + Sync + 'static>,
2101 }
2102 impl actr_runtime_mailbox::MailboxDepthObserver for EnqueueObserver {
2103 fn on_depth_change(&self, queued_messages: usize) {
2104 (self.fire)(queued_messages);
2105 }
2106 }
2107
2108 let installed = {
2109 let observer: Arc<dyn actr_runtime_mailbox::MailboxDepthObserver> =
2110 Arc::new(EnqueueObserver {
2111 fire: fire_if_rising.clone(),
2112 });
2113 mailbox.set_depth_observer(observer)
2114 };
2115
2116 if installed {
2117 tracing::debug!("mailbox backpressure watchdog: push notifications enabled");
2118 } else {
2119 tracing::debug!(
2120 "mailbox backpressure watchdog: backend does not support push, falling back to 1 Hz polling"
2121 );
2122 let mailbox_for_poll = mailbox.clone();
2123 let shutdown_for_poll = shutdown.clone();
2124 let fire_for_poll = fire_if_rising.clone();
2125 let watchdog_handle = tokio::spawn(async move {
2126 let mut ticker = tokio::time::interval(Duration::from_secs(1));
2127 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
2128 loop {
2129 tokio::select! {
2130 _ = shutdown_for_poll.cancelled() => {
2131 tracing::debug!(
2132 "mailbox backpressure watchdog shutting down"
2133 );
2134 break;
2135 }
2136 _ = ticker.tick() => {
2137 let status = match mailbox_for_poll.status().await {
2138 Ok(s) => s,
2139 Err(e) => {
2140 tracing::debug!(?e, "mailbox status poll failed");
2141 continue;
2142 }
2143 };
2144 fire_for_poll(status.queued_messages as usize);
2145 }
2146 }
2147 }
2148 });
2149 task_handles.push(watchdog_handle);
2150 }
2151 }
2152
2153 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2154 // 5. Start Mailbox processing loop (State Path)
2155 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2156 tracing::info!("🔄 Starting Mailbox processing loop (State Path)");
2157 {
2158 let node = node_ref.clone();
2159 let mailbox = node_ref.mailbox.clone();
2160 let webrtc_gate = node_ref.webrtc_gate.clone();
2161 let ws_gate = node_ref.websocket_gate.clone();
2162 let shutdown = shutdown_token.clone();
2163
2164 let mailbox_handle = tokio::spawn(async move {
2165 loop {
2166 tokio::select! {
2167 // Listen for shutdown signal
2168 _ = shutdown.cancelled() => {
2169 tracing::info!("📭 Mailbox loop received shutdown signal");
2170 break;
2171 }
2172 // Dequeue messages (by priority)
2173 result = mailbox.dequeue() => {
2174 match result {
2175 Ok(messages) => {
2176 if messages.is_empty() {
2177 // Queue empty, sleep briefly
2178 tokio::time::sleep(Duration::from_millis(10)).await;
2179 continue;
2180 }
2181 tracing::debug!("📬 Mailbox dequeue: {} messages", messages.len());
2182
2183 // Process messages one by one
2184 for msg_record in messages {
2185 // Deserialize RpcEnvelope (Protobuf)
2186 match RpcEnvelope::decode(&msg_record.payload[..]) {
2187 Ok(envelope) => {
2188 let request_id = envelope.request_id.clone();
2189 let queue_latency_ms = (chrono::Utc::now() - msg_record.created_at).num_milliseconds();
2190 tracing::info!(request_id = %request_id, queue_latency_ms = queue_latency_ms, "rpc.mailbox.dequeued");
2191
2192 tracing::debug!("📦 Processing message: request_id={}", request_id);
2193 #[cfg(feature = "opentelemetry")]
2194 let span = {
2195 let actr_id_str = node.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default();
2196 let span = tracing::info_span!("ActrNode.mailbox_receive", actr_id = %actr_id_str, request_id = %request_id, queue_wait_ms = queue_latency_ms);
2197 set_parent_from_rpc_envelope(&span, &envelope);
2198 span
2199 };
2200
2201 // Decode caller_id from MessageRecord.from (transport layer)
2202 let caller_id_result = ActrId::decode(&msg_record.from[..]);
2203 let caller_id_ref = caller_id_result.as_ref().ok();
2204
2205 if caller_id_ref.is_none() {
2206 tracing::warn!(
2207 request_id = %request_id,
2208 "⚠️ Failed to decode caller_id from MessageRecord.from"
2209 );
2210 }
2211
2212 // Call handle_incoming with caller_id from transport layer
2213 let handle_incoming_fut = node.handle_incoming(envelope.clone(), caller_id_ref);
2214 #[cfg(feature = "opentelemetry")]
2215 let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
2216
2217 /// Send `response_envelope` back to `caller` via the
2218 /// best available transport.
2219 ///
2220 /// Priority: inbound WebSocket connection (if caller
2221 /// dialled us directly) → WebRTC gate. Returns the
2222 /// first transport error encountered, if any.
2223 async fn send_envelope_to_caller(
2224 ws_gate: &Option<Arc<crate::wire::websocket::WebSocketGate>>,
2225 webrtc_gate: &Option<Arc<crate::wire::webrtc::gate::WebRtcGate>>,
2226 caller: &ActrId,
2227 response_envelope: RpcEnvelope,
2228 request_id: &str,
2229 ) {
2230 // 1. Try inbound WebSocket connection first.
2231 if let Some(wsg) = ws_gate {
2232 match wsg.send_response(caller, response_envelope.clone()).await {
2233 Ok(true) => return, // sent successfully
2234 Ok(false) => {
2235 tracing::debug!(
2236 request_id = request_id,
2237 caller = %caller,
2238 "No inbound WS connection for caller; falling back to WebRTC gate"
2239 );
2240 }
2241 Err(e) => {
2242 tracing::warn!(
2243 severity = 5,
2244 error_category = "transport_error",
2245 request_id = request_id,
2246 "WebSocketGate send_response failed, falling back: {:?}", e
2247 );
2248 }
2249 }
2250 }
2251
2252 // 2. Fall back to WebRTC gate.
2253 if let Some(gate) = webrtc_gate {
2254 if let Err(e) = gate.send_response(caller, response_envelope).await {
2255 tracing::error!(
2256 severity = 7,
2257 error_category = "transport_error",
2258 request_id = request_id,
2259 "❌ WebRtcGate send_response failed: {:?}", e
2260 );
2261 }
2262 } else {
2263 tracing::error!(
2264 severity = 7,
2265 error_category = "transport_error",
2266 request_id = request_id,
2267 "❌ No gate available to send response"
2268 );
2269 }
2270 }
2271
2272 match handle_incoming_fut.await {
2273 Ok(response_bytes) => {
2274 match caller_id_result {
2275 Ok(caller) => {
2276 // Construct response RpcEnvelope (reuse request_id!)
2277 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
2278 let mut response_envelope = RpcEnvelope {
2279 request_id: request_id.clone(),
2280 route_key: envelope.route_key.clone(),
2281 payload: Some(response_bytes),
2282 error: None,
2283 traceparent: envelope.traceparent.clone(),
2284 tracestate: envelope.tracestate.clone(),
2285 metadata: Vec::new(),
2286 timeout_ms: 30000,
2287 };
2288 // Inject tracing context
2289 #[cfg(feature = "opentelemetry")]
2290 inject_span_context_to_rpc(&span, &mut response_envelope);
2291
2292 #[cfg(feature = "opentelemetry")]
2293 let send_fut = send_envelope_to_caller(
2294 &ws_gate,
2295 &webrtc_gate,
2296 &caller,
2297 response_envelope,
2298 &request_id,
2299 ).instrument(span);
2300 #[cfg(not(feature = "opentelemetry"))]
2301 let send_fut = send_envelope_to_caller(
2302 &ws_gate,
2303 &webrtc_gate,
2304 &caller,
2305 response_envelope,
2306 &request_id,
2307 );
2308 send_fut.await;
2309 }
2310 Err(e) => {
2311 tracing::error!(
2312 severity = 8,
2313 error_category = "protobuf_decode",
2314 request_id = %envelope.request_id,
2315 "❌ Failed to decode caller_id: {:?}",
2316 e
2317 );
2318 }
2319 }
2320
2321 // ACK message
2322 if let Err(e) = mailbox.ack(msg_record.id).await {
2323 tracing::error!(
2324 severity = 9,
2325 error_category = "mailbox_error",
2326 request_id = %envelope.request_id,
2327 message_id = %msg_record.id,
2328 "❌ Mailbox ACK failed: {:?}",
2329 e
2330 );
2331 }
2332 }
2333 Err(e) => {
2334 tracing::error!(
2335 severity = 6,
2336 error_category = "handler_error",
2337 request_id = %envelope.request_id,
2338 route_key = %envelope.route_key,
2339 "❌ handle_incoming failed: {:?}", e
2340 );
2341
2342 // Send error envelope back to caller so it
2343 // receives a structured error rather than
2344 // waiting until its deadline fires.
2345 if let Ok(caller) = caller_id_result {
2346 let error_response = actr_protocol::ErrorResponse {
2347 code: protocol_error_to_code(&e),
2348 message: e.to_string(),
2349 };
2350 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
2351 let mut error_envelope = RpcEnvelope {
2352 request_id: request_id.clone(),
2353 route_key: envelope.route_key.clone(),
2354 payload: None,
2355 error: Some(error_response),
2356 traceparent: envelope.traceparent.clone(),
2357 tracestate: envelope.tracestate.clone(),
2358 metadata: Vec::new(),
2359 timeout_ms: 30000,
2360 };
2361 #[cfg(feature = "opentelemetry")]
2362 inject_span_context_to_rpc(&span, &mut error_envelope);
2363
2364 send_envelope_to_caller(
2365 &ws_gate,
2366 &webrtc_gate,
2367 &caller,
2368 error_envelope,
2369 &request_id,
2370 ).await;
2371 }
2372
2373 // ACK to avoid infinite retries
2374 let _ = mailbox.ack(msg_record.id).await;
2375 }
2376 }
2377 }
2378 Err(e) => {
2379 // Poison message - cannot decode RpcEnvelope
2380 tracing::error!(
2381 severity = 9,
2382 error_category = "protobuf_decode",
2383 message_id = %msg_record.id,
2384 "❌ Poison message: Failed to deserialize RpcEnvelope: {:?}",
2385 e
2386 );
2387
2388 // Write to Dead Letter Queue
2389 use actr_runtime_mailbox::DlqRecord;
2390 use chrono::Utc;
2391 use uuid::Uuid;
2392
2393 let dlq_record = DlqRecord {
2394 id: Uuid::new_v4(),
2395 original_message_id: Some(msg_record.id.to_string()),
2396 from: Some(msg_record.from.clone()),
2397 to: node.actor_id.as_ref().map(|id| {
2398 let mut buf = Vec::new();
2399 id.encode(&mut buf).unwrap();
2400 buf
2401 }),
2402 raw_bytes: msg_record.payload.clone(),
2403 error_message: format!("Protobuf decode failed: {e}"),
2404 error_category: "protobuf_decode".to_string(),
2405 trace_id: format!("mailbox-{}", msg_record.id),
2406 request_id: None,
2407 created_at: Utc::now(),
2408 redrive_attempts: 0,
2409 last_redrive_at: None,
2410 context: Some(format!(
2411 r#"{{"source":"mailbox","priority":"{}"}}"#,
2412 match msg_record.priority {
2413 actr_runtime_mailbox::MessagePriority::High => "high",
2414 actr_runtime_mailbox::MessagePriority::Normal => "normal",
2415 }
2416 )),
2417 };
2418
2419 if let Err(dlq_err) = node.dlq.enqueue(dlq_record).await {
2420 tracing::error!(
2421 severity = 10,
2422 "❌ CRITICAL: Failed to write poison message to DLQ: {:?}",
2423 dlq_err
2424 );
2425 } else {
2426 tracing::warn!(
2427 severity = 9,
2428 "☠️ Poison message moved to DLQ: message_id={}",
2429 msg_record.id
2430 );
2431 }
2432
2433 // ACK the poison message to remove from mailbox
2434 let _ = mailbox.ack(msg_record.id).await;
2435 }
2436 }
2437 }
2438 }
2439 Err(e) => {
2440 tracing::error!(
2441 severity = 9,
2442 error_category = "mailbox_error",
2443 "❌ Mailbox dequeue failed: {:?}", e
2444 );
2445 tokio::time::sleep(Duration::from_secs(1)).await;
2446 }
2447 }
2448 }
2449 }
2450 }
2451 tracing::info!("✅ Mailbox processing loop terminated gracefully");
2452 });
2453
2454 task_handles.push(mailbox_handle);
2455 }
2456 tracing::info!("✅ Mailbox processing loop started");
2457 tracing::info!("✅ ActrNode started successfully");
2458
2459 {
2460 let ready_ctx = bootstrap_ctx_builder
2461 .build_bootstrap(&actor_id, &credential_state.credential().await);
2462 let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_ready");
2463 let call_executor =
2464 lifecycle_host_abi(ready_ctx.clone(), node_ref.workload_dispatch.clone());
2465 let mut workload = node_ref.workload_dispatch.lock().await;
2466 if let Err(e) = crate::lifecycle::hooks::call_lifecycle_hook(
2467 "on_ready",
2468 workload.on_ready(ready_ctx, invocation, &call_executor),
2469 )
2470 .await
2471 {
2472 tracing::warn!(error = %e, "workload on_ready returned Err");
2473 }
2474 }
2475
2476 // Create ActrRefShared
2477 let shared = Arc::new(ActrRefShared {
2478 actor_id,
2479 bootstrap_ctx_builder,
2480 credential_state,
2481 shutdown_token,
2482 task_handles: Mutex::new(task_handles),
2483 });
2484
2485 // Create ActrRef
2486 tracing::info!("✅ ActrRef created (Shell → Guest communication handle)");
2487
2488 Ok(ActrRef { shared })
2489 }
2490}
2491
2492#[cfg(test)]
2493mod tests {
2494 use super::*;
2495
2496 #[test]
2497 fn connection_not_ready_has_distinct_wire_code() {
2498 let err = ActrError::ConnectionNotReady(ConnectionNotReadyInfo::new(1200, 6000));
2499
2500 assert_eq!(protocol_error_to_code(&err), 10011);
2501 }
2502
2503 #[test]
2504 fn connection_not_ready_wire_code_roundtrips_variant_and_retry_hint() {
2505 let err = wire_code_to_actr_error(
2506 10011,
2507 "connection not ready: retry_after_ms=Some(4800)".to_string(),
2508 );
2509
2510 match err {
2511 ActrError::ConnectionNotReady(info) => {
2512 assert_eq!(info.retry_after_ms, Some(4800));
2513 }
2514 other => panic!("expected ConnectionNotReady, got {other:?}"),
2515 }
2516 }
2517
2518 #[test]
2519 fn connection_not_ready_wire_code_handles_missing_retry_hint() {
2520 let err = wire_code_to_actr_error(10011, "connection not ready".to_string());
2521
2522 match err {
2523 ActrError::ConnectionNotReady(info) => {
2524 assert_eq!(info.retry_after_ms, None);
2525 }
2526 other => panic!("expected ConnectionNotReady, got {other:?}"),
2527 }
2528 }
2529}