actr_runtime/lifecycle/
actr_node.rs

1//! ActrNode - ActrSystem + Workload (1:1 composition)
2
3use crate::context_factory::ContextFactory;
4use crate::transport::InprocTransportManager;
5#[cfg(feature = "opentelemetry")]
6use crate::wire::webrtc::trace::{inject_span_context_to_rpc, set_parent_from_rpc_envelope};
7use actr_framework::{Bytes, Workload};
8use actr_mailbox::{DeadLetterQueue, Mailbox};
9use actr_protocol::prost::Message as ProstMessage;
10use actr_protocol::{
11    AIdCredential, ActorResult, ActrId, ActrType, PayloadType, RegisterRequest,
12    RouteCandidatesRequest, RpcEnvelope, register_response, route_candidates_request,
13};
14use futures_util::FutureExt;
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::RwLock;
18use tokio_util::sync::CancellationToken;
19#[cfg(feature = "opentelemetry")]
20use tracing::Instrument as _;
21// Use types from sub-crates
22use crate::wire::webrtc::SignalingClient;
23// Use extension traits from actr-protocol
24use actr_protocol::{ActrIdExt, ActrTypeExt};
25// Use heartbeat functions
26use crate::lifecycle::heartbeat::heartbeat_task;
27
28// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
29// Constants
30// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
31
32/// ActrNode - ActrSystem + Workload (1:1 composition)
33///
34/// # Generic Parameters
35/// - `W`: Workload type
36///
37/// # MessageDispatcher Association
38/// - Statically associated via W::Dispatcher
39/// - Does not store Dispatcher instance (not even ZST needed)
40/// - Dispatch calls entirely through type system
41pub struct ActrNode<W: Workload> {
42    /// Runtime configuration
43    pub(crate) config: actr_config::Config,
44
45    /// Workload instance (the only business logic)
46    pub(crate) workload: Arc<W>,
47
48    /// SQLite persistent mailbox
49    pub(crate) mailbox: Arc<dyn Mailbox>,
50
51    /// Dead Letter Queue for poison messages
52    pub(crate) dlq: Arc<dyn DeadLetterQueue>,
53
54    /// Context factory (created in start() after obtaining ActorId)
55    pub(crate) context_factory: Option<ContextFactory>,
56
57    /// Signaling client
58    pub(crate) signaling_client: Arc<dyn SignalingClient>,
59
60    /// Actor ID (obtained after startup)
61    pub(crate) actor_id: Option<ActrId>,
62
63    /// Actor Credential (obtained after startup, used for subsequent authentication messages)
64    pub(crate) credential_state: Option<CredentialState>,
65
66    /// Pre-shared key for TURN authentication (obtained from registration)
67    pub(crate) psk: Option<Bytes>,
68
69    /// WebRTC coordinator (created after startup)
70    pub(crate) webrtc_coordinator: Option<Arc<crate::wire::webrtc::coordinator::WebRtcCoordinator>>,
71
72    /// WebRTC Gate (created after startup)
73    pub(crate) webrtc_gate: Option<Arc<crate::wire::webrtc::gate::WebRtcGate>>,
74
75    /// Shell → Workload Transport Manager
76    ///
77    /// Workload receives REQUEST from Shell (zero serialization, direct RpcEnvelope passing)
78    pub(crate) inproc_mgr: Option<Arc<InprocTransportManager>>,
79
80    /// Workload → Shell Transport Manager
81    ///
82    /// Workload sends RESPONSE to Shell (separate pending_requests from Shell's)
83    pub(crate) workload_to_shell_mgr: Option<Arc<InprocTransportManager>>,
84
85    /// Shutdown token for graceful shutdown
86    pub(crate) shutdown_token: CancellationToken,
87}
88
89/// Credential state for shared access between tasks
90#[derive(Clone)]
91pub struct CredentialState {
92    inner: Arc<RwLock<CredentialStateInner>>,
93}
94
95#[derive(Clone)]
96struct CredentialStateInner {
97    credential: AIdCredential,
98    expires_at: Option<prost_types::Timestamp>,
99    /// This is updated together with credential when credential is refreshed
100    psk: Option<Bytes>,
101}
102
103impl CredentialState {
104    /// Create a new CredentialState with PSK
105    fn new(
106        credential: AIdCredential,
107        expires_at: Option<prost_types::Timestamp>,
108        psk: Option<Bytes>,
109    ) -> Self {
110        Self {
111            inner: Arc::new(RwLock::new(CredentialStateInner {
112                credential,
113                expires_at,
114                psk,
115            })),
116        }
117    }
118
119    pub async fn credential(&self) -> AIdCredential {
120        self.inner.read().await.credential.clone()
121    }
122
123    pub async fn expires_at(&self) -> Option<prost_types::Timestamp> {
124        self.inner.read().await.expires_at
125    }
126
127    /// Get the PSK for TURN authentication
128    pub async fn psk(&self) -> Option<Bytes> {
129        self.inner.read().await.psk.clone()
130    }
131
132    /// Update credential along with PSK
133    /// This should be called when credential is refreshed and a new PSK is provided
134    pub(crate) async fn update(
135        &self,
136        credential: AIdCredential,
137        expires_at: Option<prost_types::Timestamp>,
138        psk: Option<Bytes>,
139    ) {
140        let mut guard = self.inner.write().await;
141        guard.credential = credential;
142        guard.expires_at = expires_at;
143        if psk.is_some() {
144            guard.psk = psk;
145        }
146    }
147}
148
149/// Map ProtocolError to error code for ErrorResponse
150fn protocol_error_to_code(err: &actr_protocol::ProtocolError) -> u32 {
151    use actr_protocol::ProtocolError;
152    match err {
153        ProtocolError::Actr(_) => 400, // Bad Request - identity/decode error
154        ProtocolError::Uri(_) => 400,  // Bad Request - URI parsing error
155        ProtocolError::Name(_) => 400, // Bad Request - invalid name
156        ProtocolError::SerializationError(_) => 500, // Internal Server Error
157        ProtocolError::DeserializationError(_) => 400, // Bad Request - invalid payload
158        ProtocolError::DecodeError(_) => 400, // Bad Request - decode failure
159        ProtocolError::EncodeError(_) => 500, // Internal Server Error
160        ProtocolError::UnknownRoute(_) => 404, // Not Found - route not found
161        ProtocolError::TransportError(_) => 503, // Service Unavailable
162        ProtocolError::Timeout => 504, // Gateway Timeout
163        ProtocolError::TargetNotFound(_) => 404, // Not Found
164        ProtocolError::TargetUnavailable(_) => 503, // Service Unavailable
165        ProtocolError::InvalidStateTransition(_) => 500, // Internal Server Error
166    }
167}
168
169/// Check ACL permission for incoming request
170///
171/// # Arguments
172/// - `caller_id`: The ActrId of the caller (None for local calls)
173/// - `target_id`: The ActrId of the target (self)
174/// - `acl`: ACL rules from configuration
175///
176/// # Returns
177/// - `Ok(true)`: Permission granted
178/// - `Ok(false)`: Permission denied
179/// - `Err`: ACL check failed (treat as deny)
180///
181/// # ACL Evaluation Logic
182/// 1. If no caller_id (local call), always allow
183/// 2. If no ACL configured, allow by default (permissive mode for backward compatibility)
184/// 3. If ACL configured but rules are empty, deny all (secure by default)
185/// 4. Iterate through ACL rules in order (first match wins)
186///    - Check if caller matches any principal in the rule
187///    - If matched, return the rule's permission (ALLOW/DENY)
188/// 5. If no rule matches, deny by default (secure by default)
189fn check_acl_permission(
190    caller_id: Option<&ActrId>,
191    target_id: &ActrId,
192    acl: Option<&actr_protocol::Acl>,
193) -> Result<bool, String> {
194    // 1. Local calls (no caller_id) are always allowed
195    if caller_id.is_none() {
196        tracing::trace!("ACL: Local call, allowing");
197        return Ok(true);
198    }
199
200    let caller = caller_id.unwrap();
201
202    // 2. No ACL configured - allow by default
203    let acl_rules = match acl {
204        Some(acl) => acl,
205        None => {
206            tracing::trace!(
207                "ACL: No ACL configured, allowing {} -> {}",
208                caller.to_string_repr(),
209                target_id.to_string_repr()
210            );
211            return Ok(true);
212        }
213    };
214
215    // 3. If ACL is configured but has no rules, deny all (secure by default)
216    if acl_rules.rules.is_empty() {
217        tracing::warn!(
218            "ACL: ACL configured but no rules defined, denying {} -> {} (default deny)",
219            caller.to_string_repr(),
220            target_id.to_string_repr()
221        );
222        return Ok(false);
223    }
224
225    // 4. Iterate through ACL rules (first match wins)
226    for (rule_idx, rule) in acl_rules.rules.iter().enumerate() {
227        // Check if caller matches any principal in this rule
228        let mut matched = false;
229
230        // If no principals specified, skip this rule (empty allow list = no match)
231        if rule.principals.is_empty() {
232            tracing::trace!(
233                "ACL: Rule {} has empty principals list, skipping (no match)",
234                rule_idx
235            );
236            continue;
237        }
238
239        // Check each principal
240        for principal in &rule.principals {
241            if matches_principal(caller, principal) {
242                matched = true;
243                tracing::trace!(
244                    "ACL: Rule {} matched principal: caller={}, principal_realm={:?}, principal_type={:?}",
245                    rule_idx,
246                    caller.to_string_repr(),
247                    principal.realm.as_ref().map(|r| r.realm_id),
248                    principal.actr_type.as_ref().map(|t| &t.name)
249                );
250                break;
251            }
252        }
253
254        // If matched, return the permission
255        if matched {
256            let permission = rule.permission;
257            let is_allow = permission == actr_protocol::acl_rule::Permission::Allow as i32;
258
259            tracing::debug!(
260                "ACL: Rule {} matched, permission={} for {} -> {}",
261                rule_idx,
262                if is_allow { "ALLOW" } else { "DENY" },
263                caller.to_string_repr(),
264                target_id.to_string_repr()
265            );
266
267            return Ok(is_allow);
268        }
269    }
270
271    // 5. No rule matched - deny by default (secure by default)
272    tracing::warn!(
273        "ACL: No matching rule found, denying {} -> {} (default deny)",
274        caller.to_string_repr(),
275        target_id.to_string_repr()
276    );
277    Ok(false)
278}
279
280/// Check if a caller matches a principal
281///
282/// A principal matches if:
283/// - If principal.realm is specified, it must match caller.realm
284/// - If principal.actr_type is specified, it must match caller.type
285/// - If both are None, principal matches all (should not happen in practice)
286fn matches_principal(caller: &ActrId, principal: &actr_protocol::acl_rule::Principal) -> bool {
287    // Check realm match (if specified)
288    if let Some(ref principal_realm) = principal.realm
289        && caller.realm.realm_id != principal_realm.realm_id
290    {
291        return false;
292    }
293
294    // Check type match (if specified)
295    if let Some(ref principal_type) = principal.actr_type
296        && (caller.r#type.manufacturer != principal_type.manufacturer
297            || caller.r#type.name != principal_type.name)
298    {
299        return false;
300    }
301
302    // If we reach here, all specified fields matched
303    true
304}
305
306impl<W: Workload> ActrNode<W> {
307    /// Get Inproc Transport Manager
308    ///
309    /// # Returns
310    /// - `Some(Arc<InprocTransportManager>)`: Initialized manager
311    /// - `None`: Not yet started (need to call start() first)
312    ///
313    /// # Use Cases
314    /// - Workload internals need to communicate with Shell
315    /// - Create custom LatencyFirst/MediaTrack channels
316    pub fn inproc_mgr(&self) -> Option<Arc<InprocTransportManager>> {
317        self.inproc_mgr.clone()
318    }
319
320    /// Get ActorId (if registration has completed)
321    pub fn actor_id(&self) -> Option<&ActrId> {
322        self.actor_id.as_ref()
323    }
324
325    /// Get credential state (if registration has completed)
326    pub fn credential_state(&self) -> Option<CredentialState> {
327        self.credential_state.clone()
328    }
329
330    /// Get signaling client (for manual control such as UnregisterRequest)
331    pub fn signaling_client(&self) -> Arc<dyn SignalingClient> {
332        self.signaling_client.clone()
333    }
334
335    /// Get shutdown token for this node
336    pub fn shutdown_token(&self) -> CancellationToken {
337        self.shutdown_token.clone()
338    }
339
340    /// Discover remote actors of the specified type via signaling server.
341    ///
342    /// This requests best route candidates via `RouteCandidatesRequest` using the node's existing registration.
343    ///
344    /// The method returns the ordered list of candidate `ActrId`s reported by the signaling server.
345    ///
346    /// # Errors
347    /// - Returns `InvalidStateTransition` if the node is not started (no actor_id/credential).
348    ///   The node must be started via `start()` before calling this method.
349    /// - Returns `TransportError` if the signaling client is not connected.
350    #[cfg_attr(feature = "opentelemetry", tracing::instrument(skip_all))]
351    pub async fn discover_route_candidates(
352        &self,
353        target_type: &ActrType,
354        candidate_count: u32,
355    ) -> ActorResult<Vec<ActrId>> {
356        // Check if node is started (has actor_id and credential)
357        let actor_id = self.actor_id.as_ref().ok_or_else(|| {
358            actr_protocol::ProtocolError::InvalidStateTransition(
359                "Node is not started. Call start() first.".to_string(),
360            )
361        })?;
362
363        // Check if the signaling client is connected
364        if !self.signaling_client.is_connected() {
365            return Err(actr_protocol::ProtocolError::TransportError(
366                "Signaling client is not connected.".to_string(),
367            ));
368        }
369
370        let client = self.signaling_client.as_ref();
371
372        let criteria = route_candidates_request::NodeSelectionCriteria {
373            candidate_count,
374            ranking_factors: Vec::new(),
375            minimal_dependency_requirement: None,
376            minimal_health_requirement: None,
377        };
378
379        let route_request = RouteCandidatesRequest {
380            target_type: target_type.clone(),
381            criteria: Some(criteria),
382            client_location: None,
383        };
384
385        let credential_state = self.credential_state.clone().ok_or_else(|| {
386            actr_protocol::ProtocolError::InvalidStateTransition(
387                "Node is not started. Call start() first.".to_string(),
388            )
389        })?;
390        let route_response = client
391            .send_route_candidates_request(
392                actor_id.clone(),
393                credential_state.credential().await,
394                route_request,
395            )
396            .await
397            .map_err(|e| {
398                actr_protocol::ProtocolError::TransportError(format!(
399                    "Route candidates request failed: {e}"
400                ))
401            })?;
402
403        match route_response.result {
404            Some(actr_protocol::route_candidates_response::Result::Success(success)) => {
405                Ok(success.candidates)
406            }
407            Some(actr_protocol::route_candidates_response::Result::Error(err)) => {
408                Err(actr_protocol::ProtocolError::TransportError(format!(
409                    "Route candidates error {}: {}",
410                    err.code, err.message
411                )))
412            }
413            None => Err(actr_protocol::ProtocolError::TransportError(
414                "Invalid route candidates response: missing result".to_string(),
415            )),
416        }
417    }
418
419    /// Handle incoming message envelope
420    ///
421    /// # Performance Analysis
422    /// 1. create_context: ~10ns
423    /// 2. W::Dispatcher::dispatch: ~5-10ns (static match, can be inlined)
424    /// 3. User business logic: variable
425    ///
426    /// Framework overhead: ~15-20ns (compared to 50-100ns in traditional approaches)
427    ///
428    /// # Zero-cost Abstraction
429    /// - Compiler can inline entire call chain
430    /// - Match branches can be directly expanded
431    /// - Final generated code approaches hand-written match expression
432    ///
433    /// # Parameters
434    /// - `envelope`: The RPC envelope containing the message
435    /// - `caller_id`: The ActrId of the caller (from transport layer, None for local Shell calls)
436    ///
437    /// # caller_id Design
438    ///
439    /// **Why not in RpcEnvelope?**
440    /// - Transport layer (WebRTC/Mailbox) already knows the sender
441    /// - All connections are direct P2P (no intermediaries)
442    /// - Storing in envelope would be redundant duplication
443    ///
444    /// **How it works:**
445    /// - WebRTC/Mailbox stores sender in `MessageRecord.from` (Protobuf bytes)
446    /// - Only decoded when creating Context (once per message)
447    /// - Shell calls pass `None` (local process, no remote caller)
448    /// - Remote calls decode from `MessageRecord.from`
449    ///
450    /// **trace_id vs request_id:**
451    /// - `trace_id`: Distributed tracing across entire call chain (A → B → C)
452    /// - `request_id`: Unique identifier for each request-response pair
453    /// - Both kept for flexibility in complex scenarios
454    /// - Single-hop calls: effectively identical
455    /// - Multi-hop calls: trace_id spans all hops, request_id per hop
456    #[cfg_attr(
457        feature = "opentelemetry",
458        tracing::instrument(skip_all, name = "ActrNode.handle_incoming")
459    )]
460    pub async fn handle_incoming(
461        &self,
462        envelope: RpcEnvelope,
463        caller_id: Option<&ActrId>,
464    ) -> ActorResult<Bytes> {
465        use actr_framework::MessageDispatcher;
466
467        // Log received message
468        if let Some(caller) = caller_id {
469            tracing::debug!(
470                "📨 Handling incoming message: route_key={}, caller={}, request_id={}",
471                envelope.route_key,
472                caller.to_string_repr(),
473                envelope.request_id
474            );
475        } else {
476            tracing::debug!(
477                "📨 Handling incoming message: route_key={}, request_id={}",
478                envelope.route_key,
479                envelope.request_id
480            );
481        }
482
483        // 0. Get actor_id early for ACL check
484        let actor_id = self.actor_id.as_ref().ok_or_else(|| {
485            actr_protocol::ProtocolError::InvalidStateTransition(
486                "Actor ID not set - node must be started before handling messages".to_string(),
487            )
488        })?;
489
490        // 0.1. ACL Permission Check (before processing message)
491        let acl_allowed = check_acl_permission(caller_id, actor_id, self.config.acl.as_ref())
492            .map_err(|err_msg| {
493                actr_protocol::ProtocolError::TransportError(format!(
494                    "ACL check failed: {}",
495                    err_msg
496                ))
497            })?;
498
499        if !acl_allowed {
500            tracing::warn!(
501                severity = 5,
502                error_category = "acl_denied",
503                request_id = %envelope.request_id,
504                route_key = %envelope.route_key,
505                caller = %caller_id.map(|c| c.to_string_repr()).unwrap_or_else(|| "<none>".to_string()),
506                "🚫 ACL: Permission denied"
507            );
508
509            return Err(actr_protocol::ProtocolError::Actr(
510                actr_protocol::ActrError::PermissionDenied {
511                    message: format!(
512                        "ACL denied: {} is not allowed to call {}",
513                        caller_id
514                            .map(|c| c.to_string_repr())
515                            .unwrap_or_else(|| "<unknown>".to_string()),
516                        actor_id.to_string_repr()
517                    ),
518                },
519            ));
520        }
521
522        // 1. Create Context with caller_id from transport layer
523        let credential_state = self.credential_state.clone().ok_or_else(|| {
524            actr_protocol::ProtocolError::InvalidStateTransition(
525                "Credential not set - node must be started before handling messages".to_string(),
526            )
527        })?;
528        let ctx = self
529            .context_factory
530            .as_ref()
531            .expect("ContextFactory must be initialized in start()")
532            .create(
533                actor_id,
534                caller_id, // caller_id from transport layer (MessageRecord.from)
535                &envelope.request_id,
536                &credential_state.credential().await,
537            );
538
539        // 2. Static MessageRouter dispatch (zero-cost abstraction)
540        // Compiler will inline entire call chain, generating code close to hand-written match
541        //
542        // Wrap dispatch in panic catching to prevent handler panics from crashing the runtime
543        let result = std::panic::AssertUnwindSafe(W::Dispatcher::dispatch(
544            &self.workload,
545            envelope.clone(),
546            &ctx,
547        ))
548        .catch_unwind()
549        .await;
550
551        let result = match result {
552            Ok(handler_result) => handler_result,
553            Err(panic_payload) => {
554                // Handler panicked - extract panic info
555                let panic_info = if let Some(s) = panic_payload.downcast_ref::<&str>() {
556                    s.to_string()
557                } else if let Some(s) = panic_payload.downcast_ref::<String>() {
558                    s.clone()
559                } else {
560                    "Unknown panic payload".to_string()
561                };
562
563                tracing::error!(
564                    severity = 8,
565                    error_category = "handler_panic",
566                    route_key = envelope.route_key,
567                    request_id = %envelope.request_id,
568                    "❌ Handler panicked: {}",
569                    panic_info
570                );
571
572                // Return DecodeFailure error with panic info
573                // (using DecodeFailure as a proxy for "cannot process message")
574                Err(actr_protocol::ProtocolError::Actr(
575                    actr_protocol::ActrError::DecodeFailure {
576                        message: format!("Handler panicked: {panic_info}"),
577                    },
578                ))
579            }
580        };
581
582        // 3. Log result
583        match &result {
584            Ok(_) => tracing::debug!(
585                request_id = %envelope.request_id,
586                route_key = %envelope.route_key,
587                "✅ Message handled successfully"
588            ),
589            Err(e) => tracing::error!(
590                severity = 6,
591                error_category = "handler_error",
592                request_id = %envelope.request_id,
593                route_key = %envelope.route_key,
594                "❌ Message handling failed: {:?}", e
595            ),
596        }
597
598        result
599    }
600
601    /// Start the system
602    ///
603    /// # Startup Sequence
604    /// 1. Connect to signaling server and register Actor
605    /// 2. Initialize transport layer (WebRTC)
606    /// 3. Call lifecycle hook on_start (if Lifecycle trait is implemented)
607    /// 4. Start Mailbox processing loop (State Path serial processing)
608    /// 5. Start Transport (begin receiving messages)
609    /// 6. Create ActrRef for Shell to interact with Workload
610    ///
611    /// # Returns
612    /// - `ActrRef<W>`: Lightweight reference for Shell to call Workload methods
613    pub async fn start(mut self) -> ActorResult<crate::actr_ref::ActrRef<W>> {
614        tracing::info!("🚀 Starting ActrNode");
615
616        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
617        // 1. Connect to signaling server and register
618        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
619        tracing::info!("📡 Connecting to signaling server");
620        self.signaling_client.connect().await.map_err(|e| {
621            actr_protocol::ProtocolError::TransportError(format!("Signaling connect failed: {e}"))
622        })?;
623        tracing::info!("✅ Connected to signaling server");
624
625        // Get ActrType from configuration
626        let actr_type = self.config.actr_type().clone();
627        tracing::info!("📋 Actor type: {}", actr_type.to_string_repr());
628
629        // Calculate ServiceSpec from config exports
630        let service_spec = self.config.calculate_service_spec();
631        if let Some(ref spec) = service_spec {
632            tracing::info!("📦 Service fingerprint: {}", spec.fingerprint);
633            tracing::info!("📦 Service tags: {:?}", spec.tags);
634        } else {
635            tracing::info!("📦 No proto exports, ServiceSpec is None");
636        }
637
638        // Construct protobuf RegisterRequest
639        let register_request = RegisterRequest {
640            actr_type: actr_type.clone(),
641            realm: self.config.realm,
642            service_spec,
643            acl: self.config.acl.clone(),
644        };
645
646        tracing::info!("📤 Registering actor with signaling server (protobuf)");
647
648        // Use send_register_request to send and wait for response
649        let register_response = self
650            .signaling_client
651            .send_register_request(register_request)
652            .await
653            .map_err(|e| {
654                actr_protocol::ProtocolError::TransportError(format!(
655                    "Actor registration failed: {e}"
656                ))
657            })?;
658
659        // Handle RegisterResponse oneof result
660        //
661        // Collect background task handles (including unregister task) so they can be managed
662        // by ActrRefShared later.
663        let mut task_handles = Vec::new();
664
665        match register_response.result {
666            Some(register_response::Result::Success(register_ok)) => {
667                let actor_id = register_ok.actr_id;
668                let credential = register_ok.credential;
669
670                tracing::info!("✅ Registration successful");
671                tracing::info!(
672                    "🆔 Assigned ActrId: {}",
673                    actr_protocol::ActrIdExt::to_string_repr(&actor_id)
674                );
675                tracing::info!(
676                    "🔐 Received credential (token_key_id: {})",
677                    credential.token_key_id
678                );
679                tracing::info!(
680                    "💓 Signaling heartbeat interval: {} seconds",
681                    register_ok.signaling_heartbeat_interval_secs
682                );
683
684                // Log additional information (if available)
685                if register_ok.psk.is_some() {
686                    tracing::debug!("🔑 Received PSK (bootstrap keying material)");
687                }
688
689                if let Some(expires_at) = &register_ok.credential_expires_at {
690                    tracing::debug!("⏰ Credential expires at: {}s", expires_at.seconds);
691                }
692
693                // Store ActrId and Credential
694                self.actor_id = Some(actor_id.clone());
695                let credential_state = CredentialState::new(
696                    credential,
697                    register_ok.credential_expires_at,
698                    register_ok.psk.clone(),
699                );
700                self.credential_state = Some(credential_state.clone());
701
702                // Pass identity to signaling client so reconnect URLs carry auth info.
703                self.signaling_client.set_actor_id(actor_id.clone()).await;
704                self.signaling_client
705                    .set_credential_state(credential_state.clone())
706                    .await;
707                // Store PSK and public_key for TURN authentication
708                self.psk = register_ok.psk.clone();
709
710                // Persist identity into ContextFactory for later Context creation
711                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
712                // 1.3. Store references to both inproc managers (already created in ActrSystem::new())
713                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
714                let shell_to_workload = self
715                    .context_factory
716                    .as_ref()
717                    .expect("ContextFactory must exist")
718                    .shell_to_workload();
719                let workload_to_shell = self
720                    .context_factory
721                    .as_ref()
722                    .expect("ContextFactory must exist")
723                    .workload_to_shell();
724                self.inproc_mgr = Some(shell_to_workload); // Workload receives from this
725                self.workload_to_shell_mgr = Some(workload_to_shell); // Workload sends to this
726
727                tracing::info!(
728                    "✅ Inproc infrastructure already ready (created in ActrSystem::new())"
729                );
730
731                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
732                // 1.5. Create WebRTC infrastructure
733                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
734                tracing::info!("🌐 Initializing WebRTC infrastructure");
735
736                // Get MediaFrameRegistry from ContextFactory
737                let media_frame_registry = self
738                    .context_factory
739                    .as_ref()
740                    .expect("ContextFactory must exist")
741                    .media_frame_registry
742                    .clone();
743
744                // Create WebRtcCoordinator
745                let coordinator =
746                    Arc::new(crate::wire::webrtc::coordinator::WebRtcCoordinator::new(
747                        actor_id.clone(),
748                        credential_state.clone(),
749                        self.signaling_client.clone(),
750                        self.config.webrtc.clone(),
751                        self.config.realm.realm_id.clone(),
752                        media_frame_registry,
753                    ));
754
755                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
756                // 1.6. Create OutprocTransportManager + OutprocOutGate (新架构)
757                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
758                tracing::info!("🏗️  Creating OutprocTransportManager with WebRTC support");
759
760                // Create DefaultWireBuilder with WebRTC coordinator
761                use crate::transport::{DefaultWireBuilder, DefaultWireBuilderConfig};
762                let wire_builder_config = DefaultWireBuilderConfig {
763                    websocket_url_template: None, // WebSocket disabled for now
764                    enable_webrtc: true,
765                    enable_websocket: false,
766                };
767                let wire_builder = Arc::new(DefaultWireBuilder::new(
768                    Some(coordinator.clone()),
769                    wire_builder_config,
770                ));
771
772                // Create OutprocTransportManager
773                use crate::transport::OutprocTransportManager;
774                let transport_manager =
775                    Arc::new(OutprocTransportManager::new(actor_id.clone(), wire_builder));
776
777                // Create OutprocOutGate with WebRTC coordinator for MediaTrack support
778                use crate::outbound::{OutGate, OutprocOutGate};
779                let outproc_gate = Arc::new(OutprocOutGate::new(
780                    transport_manager,
781                    Some(coordinator.clone()), // Enable MediaTrack support
782                ));
783                let outproc_gate_enum = OutGate::OutprocOut(outproc_gate.clone());
784
785                tracing::info!("✅ OutprocTransportManager + OutprocOutGate initialized");
786
787                // Get DataStreamRegistry from ContextFactory
788                let data_stream_registry = self
789                    .context_factory
790                    .as_ref()
791                    .expect("ContextFactory must exist")
792                    .data_stream_registry
793                    .clone();
794
795                // Create WebRtcGate with shared pending_requests and DataStreamRegistry
796                let pending_requests = outproc_gate.get_pending_requests();
797                let gate = Arc::new(crate::wire::webrtc::gate::WebRtcGate::new(
798                    coordinator.clone(),
799                    pending_requests,
800                    data_stream_registry,
801                ));
802
803                // Set local_id
804                gate.set_local_id(actor_id.clone()).await;
805
806                tracing::info!(
807                    "✅ WebRtcGate created with shared pending_requests and DataStreamRegistry"
808                );
809
810                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
811                // 1.7. Set outproc_gate in ContextFactory (completing initialization)
812                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
813                tracing::info!("🔧 Setting outproc_gate in ContextFactory");
814                self.context_factory
815                    .as_mut()
816                    .expect("ContextFactory must exist")
817                    .set_outproc_gate(outproc_gate_enum);
818
819                tracing::info!(
820                    "✅ ContextFactory fully initialized (inproc + outproc gates ready)"
821                );
822
823                // Save references (WebRtcGate kept for backward compatibility if needed)
824                self.webrtc_coordinator = Some(coordinator.clone());
825                self.webrtc_gate = Some(gate.clone());
826
827                tracing::info!("✅ WebRTC infrastructure initialized");
828
829                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
830                // 1.7.5. Create shared state for credential management
831                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
832                // Shared credential state initialized above; reused across tasks
833
834                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
835                // 1.8. Spawn heartbeat task (periodic Ping to signaling server)
836                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
837                {
838                    let shutdown = self.shutdown_token.clone();
839                    let client = self.signaling_client.clone();
840                    let actor_id_for_heartbeat = actor_id.clone();
841                    let credential_state_for_heartbeat = credential_state.clone();
842                    let mailbox_for_heartbeat = self.mailbox.clone();
843
844                    // Use interval from registration response, default to 30s
845                    let heartbeat_interval_secs = register_ok.signaling_heartbeat_interval_secs;
846                    let heartbeat_interval = if heartbeat_interval_secs > 0 {
847                        Duration::from_secs(heartbeat_interval_secs as u64)
848                    } else {
849                        Duration::from_secs(30)
850                    };
851
852                    let heartbeat_handle = tokio::spawn(heartbeat_task(
853                        shutdown,
854                        client,
855                        actor_id_for_heartbeat,
856                        credential_state_for_heartbeat,
857                        mailbox_for_heartbeat,
858                        heartbeat_interval,
859                    ));
860
861                    task_handles.push(heartbeat_handle);
862                }
863                tracing::info!(
864                    "✅ Heartbeat task started (interval: {}s)",
865                    register_ok.signaling_heartbeat_interval_secs
866                );
867
868                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
869                // 1.9. Spawn dedicated Unregister task (best-effort, with timeout)
870                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
871                //
872                // This task:
873                // - Waits for shutdown_token to be cancelled (e.g., wait_for_ctrl_c_and_shutdown)
874                // - Then sends UnregisterRequest via signaling client with a timeout
875                //
876                // NOTE: we push its JoinHandle into task_handles so it can be aborted
877                // by ActrRefShared::Drop if needed.
878                {
879                    let shutdown = self.shutdown_token.clone();
880                    let client = self.signaling_client.clone();
881                    let actor_id_for_unreg = actor_id.clone();
882                    let credential_state_for_unreg = credential_state.clone();
883                    let webrtc_coordinator = self.webrtc_coordinator.clone();
884
885                    let unregister_handle = tokio::spawn(async move {
886                        // Wait for shutdown signal
887                        shutdown.cancelled().await;
888                        tracing::info!(
889                            "📡 Shutdown signal received2, sending UnregisterRequest for Actor {:?}",
890                            actor_id_for_unreg
891                        );
892
893                        // 1. 先关闭所有 WebRTC peer 连接(如果存在)
894                        if let Some(coord) = webrtc_coordinator {
895                            if let Err(e) = coord.close_all_peers().await {
896                                tracing::warn!(
897                                    "⚠️ Failed to close all WebRTC peers before UnregisterRequest: {}",
898                                    e
899                                );
900                            } else {
901                                tracing::info!(
902                                    "✅ All WebRTC peers closed before UnregisterRequest"
903                                );
904                            }
905                        } else {
906                            tracing::debug!(
907                                "WebRTC coordinator not found before UnregisterRequest (no WebRTC?)"
908                            );
909                        }
910
911                        // 2. 再发送 UnregisterRequest,设置一个超时(例如 5 秒)
912                        let result = tokio::time::timeout(
913                            std::time::Duration::from_secs(5),
914                            client.send_unregister_request(
915                                actor_id_for_unreg.clone(),
916                                credential_state_for_unreg.credential().await,
917                                Some("Graceful shutdown".to_string()),
918                            ),
919                        )
920                        .await;
921                        tracing::info!("UnregisterRequest result: {:?}", result);
922                        match result {
923                            Ok(Ok(_)) => {
924                                tracing::info!(
925                                    "✅ UnregisterRequest sent to signaling server for Actor {:?}",
926                                    actor_id_for_unreg
927                                );
928                            }
929                            Ok(Err(e)) => {
930                                tracing::warn!(
931                                    "⚠️ Failed to send UnregisterRequest for Actor {:?}: {}",
932                                    actor_id_for_unreg,
933                                    e
934                                );
935                            }
936                            Err(_) => {
937                                tracing::warn!(
938                                    "⚠️ UnregisterRequest timeout (5s) for Actor {:?}",
939                                    actor_id_for_unreg
940                                );
941                            }
942                        }
943                    });
944
945                    task_handles.push(unregister_handle);
946                }
947
948                // Spawn signaling auto-reconnect helper that reacts immediately to disconnect events.
949                crate::wire::webrtc::spawn_signaling_reconnector(
950                    self.signaling_client.clone(),
951                    self.shutdown_token.clone(),
952                );
953            }
954            Some(register_response::Result::Error(error)) => {
955                tracing::error!(
956                    severity = 10,
957                    error_category = "registration_error",
958                    error_code = error.code,
959                    "❌ Registration failed: code={}, message={}",
960                    error.code,
961                    error.message
962                );
963                return Err(actr_protocol::ProtocolError::TransportError(format!(
964                    "Registration rejected: {} (code: {})",
965                    error.message, error.code
966                )));
967            }
968            None => {
969                tracing::error!(
970                    severity = 10,
971                    error_category = "registration_error",
972                    "❌ Registration response missing result"
973                );
974                return Err(actr_protocol::ProtocolError::TransportError(
975                    "Invalid registration response: missing result".to_string(),
976                ));
977            }
978        }
979
980        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
981        // 2. Transport layer initialization (completed via WebRTC infrastructure)
982        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
983        tracing::info!("✅ Transport layer initialized via WebRTC infrastructure");
984
985        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
986        // 3.1 Convert to Arc (before starting background loops)
987        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
988        // Clone actor_id before moving self into Arc
989        let actor_id = self
990            .actor_id
991            .as_ref()
992            .ok_or_else(|| {
993                actr_protocol::ProtocolError::InvalidStateTransition(
994                    "Actor ID not set - registration must complete before starting node"
995                        .to_string(),
996                )
997            })?
998            .clone();
999        let credential_state = self.credential_state.clone().ok_or_else(|| {
1000            actr_protocol::ProtocolError::InvalidStateTransition(
1001                "Credential not set - node must be started before handling messages".to_string(),
1002            )
1003        })?;
1004
1005        let actor_id_for_shell = actor_id.clone();
1006        let shutdown_token = self.shutdown_token.clone();
1007        let node_ref = Arc::new(self);
1008
1009        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1010        // 3.5. Start WebRTC background loops (BEFORE on_start)
1011        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1012        // CRITICAL: Start signaling loop before on_start() to avoid deadlock
1013        // where on_start() tries to send messages but signaling loop isn't running
1014        tracing::info!("🚀 Starting WebRTC background loops");
1015
1016        // Start WebRtcCoordinator signaling loop
1017        if let Some(coordinator) = &node_ref.webrtc_coordinator {
1018            coordinator.clone().start().await.map_err(|e| {
1019                actr_protocol::ProtocolError::TransportError(format!(
1020                    "WebRtcCoordinator start failed: {e}"
1021                ))
1022            })?;
1023            tracing::info!("✅ WebRtcCoordinator signaling loop started");
1024        }
1025
1026        // Start WebRtcGate message receive loop (route to Mailbox)
1027        if let Some(gate) = &node_ref.webrtc_gate {
1028            gate.start_receive_loop(node_ref.mailbox.clone())
1029                .await
1030                .map_err(|e| {
1031                    actr_protocol::ProtocolError::TransportError(format!(
1032                        "WebRtcGate receive loop start failed: {e}"
1033                    ))
1034                })?;
1035            tracing::info!("✅ WebRtcGate → Mailbox routing started");
1036        }
1037
1038        tracing::info!("✅ WebRTC background loops started");
1039
1040        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1041        // 4. Call lifecycle hook on_start (AFTER WebRTC loops are running)
1042        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1043        tracing::info!("🪝 Calling lifecycle hook: on_start");
1044
1045        let ctx = node_ref
1046            .context_factory
1047            .as_ref()
1048            .expect("ContextFactory must be initialized before on_start")
1049            .create(
1050                &actor_id,
1051                None,        // caller_id
1052                "bootstrap", // request_id
1053                &credential_state.credential().await,
1054            );
1055        node_ref.workload.on_start(&ctx).await?;
1056        tracing::info!("✅ Lifecycle hook on_start completed");
1057
1058        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1059        // 4.6. Start Inproc receive loop (Shell → Workload)
1060        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1061        tracing::info!("🔄 Starting Inproc receive loop (Shell → Workload)");
1062        // Start Workload receive loop (Shell → Workload REQUEST)
1063        if let Some(shell_to_workload) = &node_ref.inproc_mgr {
1064            if let Some(workload_to_shell) = &node_ref.workload_to_shell_mgr {
1065                let node = node_ref.clone();
1066                let request_rx_lane = shell_to_workload
1067                    .get_lane(actr_protocol::PayloadType::RpcReliable, None)
1068                    .await
1069                    .map_err(|e| {
1070                        actr_protocol::ProtocolError::TransportError(format!(
1071                            "Failed to get Workload receive lane: {e}"
1072                        ))
1073                    })?;
1074                let response_tx = workload_to_shell.clone();
1075                let shutdown = shutdown_token.clone();
1076
1077                let inproc_handle = tokio::spawn(async move {
1078                    loop {
1079                        tokio::select! {
1080                            _ = shutdown.cancelled() => {
1081                                tracing::info!("📭 Workload receive loop (Shell → Workload) received shutdown signal");
1082                                break;
1083                            }
1084
1085                            envelope_result = request_rx_lane.recv_envelope() => {
1086                                match envelope_result {
1087                                    Ok(envelope) => {
1088                                        let request_id = envelope.request_id.clone();
1089                                        // Extract and set tracing context from envelope
1090                                        #[cfg(feature = "opentelemetry")]
1091                                        let span = {
1092                                                let span = tracing::info_span!("actrNode.lane.receive_rpc", request_id = %request_id);
1093                                                set_parent_from_rpc_envelope(&span, &envelope);
1094                                                span
1095                                            };
1096
1097                                        tracing::debug!("📨 Workload received REQUEST from Shell: request_id={}", request_id);
1098
1099                                        // Shell calls have no caller_id (local process communication)
1100                                        let handle_incoming_fut = node.handle_incoming(envelope.clone(), None);
1101                                        #[cfg(feature = "opentelemetry")]
1102                                        let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
1103                                        match handle_incoming_fut.await {
1104                                            Ok(response_bytes) => {
1105                                                // Send RESPONSE back via workload_to_shell
1106                                                // Keep same route_key (no prefix needed - separate channels!)
1107                                                #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1108                                                let mut response_envelope = RpcEnvelope {
1109                                                    route_key: envelope.route_key.clone(),
1110                                                    payload: Some(response_bytes),
1111                                                    error: None,
1112                                                    traceparent: None,
1113                                                    tracestate: None,
1114                                                    request_id: request_id.clone(),
1115                                                    metadata: Vec::new(),
1116                                                    timeout_ms: 30000,
1117                                                };
1118                                                // Inject tracing context
1119                                                #[cfg(feature = "opentelemetry")]
1120                                                inject_span_context_to_rpc(&span, &mut response_envelope);
1121
1122                                                // Send via Workload → Shell channel
1123                                                let send_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, response_envelope);
1124                                                #[cfg(feature = "opentelemetry")]
1125                                                let send_response_fut = send_response_fut.instrument(span.clone());
1126                                                if let Err(e) = send_response_fut.await {
1127                                                    tracing::error!(
1128                                                        severity = 7,
1129                                                        error_category = "transport_error",
1130                                                        request_id = %request_id,
1131                                                        "❌ Failed to send RESPONSE to Shell: {:?}", e
1132                                                    );
1133                                                }
1134                                            }
1135                                            Err(e) => {
1136                                                tracing::error!(
1137                                                    severity = 6,
1138                                                    error_category = "handler_error",
1139                                                    request_id = %request_id,
1140                                                    route_key = %envelope.route_key,
1141                                                    "❌ Workload message handling failed: {:?}", e
1142                                                );
1143
1144                                                // Send error response (system-level error on envelope)
1145                                                let error_response = actr_protocol::ErrorResponse {
1146                                                    code: protocol_error_to_code(&e),
1147                                                    message: e.to_string(),
1148                                                };
1149
1150                                                #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1151                                                let mut error_envelope = RpcEnvelope {
1152                                                    route_key: envelope.route_key.clone(),
1153                                                    payload: None,
1154                                                    error: Some(error_response),
1155                                                    traceparent: envelope.traceparent.clone(),
1156                                                    tracestate: envelope.tracestate.clone(),
1157                                                    request_id: request_id.clone(),
1158                                                    metadata: Vec::new(),
1159                                                    timeout_ms: 30000,
1160                                                };
1161                                                // Inject tracing context
1162                                                #[cfg(feature = "opentelemetry")]
1163                                                inject_span_context_to_rpc(&span, &mut error_envelope);
1164
1165                                                let send_error_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, error_envelope);
1166                                                #[cfg(feature = "opentelemetry")]
1167                                                let send_error_response_fut = send_error_response_fut.instrument(span);
1168                                                if let Err(e) = send_error_response_fut.await {
1169                                                    tracing::error!(
1170                                                        severity = 7,
1171                                                        error_category = "transport_error",
1172                                                        request_id = %request_id,
1173                                                        "❌ Failed to send ERROR response to Shell: {:?}", e
1174                                                    );
1175                                                }
1176                                            }
1177                                        }
1178                                    }
1179                                    Err(e) => {
1180                                        tracing::error!(
1181                                            severity = 8,
1182                                            error_category = "transport_error",
1183                                            "❌ Failed to receive from Shell → Workload lane: {:?}", e
1184                                        );
1185                                        break;
1186                                    }
1187                                }
1188                            }
1189                        }
1190                    }
1191                    tracing::info!(
1192                        "✅ Workload receive loop (Shell → Workload) terminated gracefully"
1193                    );
1194                });
1195
1196                task_handles.push(inproc_handle);
1197            }
1198        }
1199        tracing::info!("✅ Workload receive loop (Shell → Workload REQUEST) started");
1200
1201        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1202        // 4.7. Start Shell receive loop (Workload → Shell RESPONSE)
1203        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1204        tracing::info!("🔄 Starting Shell receive loop (Workload → Shell RESPONSE)");
1205        if let Some(workload_to_shell) = &node_ref.workload_to_shell_mgr {
1206            if let Some(shell_to_workload) = &node_ref.inproc_mgr {
1207                let response_rx_lane = workload_to_shell
1208                    .get_lane(actr_protocol::PayloadType::RpcReliable, None)
1209                    .await
1210                    .map_err(|e| {
1211                        actr_protocol::ProtocolError::TransportError(format!(
1212                            "Failed to get Shell receive lane: {e}"
1213                        ))
1214                    })?;
1215                let request_mgr = shell_to_workload.clone();
1216                let shutdown = shutdown_token.clone();
1217
1218                let shell_receive_handle = tokio::spawn(async move {
1219                    loop {
1220                        tokio::select! {
1221                            _ = shutdown.cancelled() => {
1222                                tracing::info!("📭 Shell receive loop (Workload → Shell) received shutdown signal");
1223                                break;
1224                            }
1225
1226                            envelope_result = response_rx_lane.recv_envelope() => {
1227                                match envelope_result {
1228                                    Ok(envelope) => {
1229                                        tracing::debug!("📨 Shell received RESPONSE from Workload: request_id={}", envelope.request_id);
1230
1231                                        // Check if response is success or error
1232                                        match (envelope.payload, envelope.error) {
1233                                            (Some(payload), None) => {
1234                                                // Success response
1235                                                if let Err(e) = request_mgr.complete_response(&envelope.request_id, payload).await {
1236                                                    tracing::warn!(
1237                                                        severity = 4,
1238                                                        error_category = "orphan_response",
1239                                                        request_id = %envelope.request_id,
1240                                                        "⚠️  No pending request found for response: {:?}", e
1241                                                    );
1242                                                }
1243                                            }
1244                                            (None, Some(error)) => {
1245                                                // Error response - convert to ProtocolError and complete with error
1246                                                let protocol_err = actr_protocol::ProtocolError::TransportError(
1247                                                    format!("RPC error {}: {}", error.code, error.message)
1248                                                );
1249                                                if let Err(e) = request_mgr.complete_error(&envelope.request_id, protocol_err).await {
1250                                                    tracing::warn!(
1251                                                        severity = 4,
1252                                                        error_category = "orphan_response",
1253                                                        request_id = %envelope.request_id,
1254                                                        "⚠️  No pending request found for error response: {:?}", e
1255                                                    );
1256                                                }
1257                                            }
1258                                            _ => {
1259                                                tracing::error!(
1260                                                    severity = 7,
1261                                                    error_category = "protocol_error",
1262                                                    request_id = %envelope.request_id,
1263                                                    "❌ Invalid RpcEnvelope: both payload and error are present or both absent"
1264                                                );
1265                                            }
1266                                        }
1267                                    }
1268                                    Err(e) => {
1269                                        tracing::error!(
1270                                            severity = 8,
1271                                            error_category = "transport_error",
1272                                            "❌ Failed to receive from Workload → Shell lane: {:?}", e
1273                                        );
1274                                        break;
1275                                    }
1276                                }
1277                            }
1278                        }
1279                    }
1280                    tracing::info!(
1281                        "✅ Shell receive loop (Workload → Shell) terminated gracefully"
1282                    );
1283                });
1284
1285                task_handles.push(shell_receive_handle);
1286            }
1287        }
1288        tracing::info!("✅ Shell receive loop (Workload → Shell RESPONSE) started");
1289
1290        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1291        // 5. Start Mailbox processing loop (State Path)
1292        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1293        tracing::info!("🔄 Starting Mailbox processing loop (State Path)");
1294        {
1295            let node = node_ref.clone();
1296            let mailbox = node_ref.mailbox.clone();
1297            let gate = node_ref.webrtc_gate.clone();
1298            let shutdown = shutdown_token.clone();
1299
1300            let mailbox_handle = tokio::spawn(async move {
1301                loop {
1302                    tokio::select! {
1303                        // Listen for shutdown signal
1304                        _ = shutdown.cancelled() => {
1305                            tracing::info!("📭 Mailbox loop received shutdown signal");
1306                            break;
1307                        }
1308
1309                        // Dequeue messages (by priority)
1310                        result = mailbox.dequeue() => {
1311                            match result {
1312                                Ok(messages) => {
1313                                    if messages.is_empty() {
1314                                        // Queue empty, sleep briefly
1315                                        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1316                                        continue;
1317                                    }
1318
1319                                    tracing::debug!("📬 Mailbox dequeue: {} messages", messages.len());
1320
1321                                    // Process messages one by one
1322                                    for msg_record in messages {
1323                                        // Deserialize RpcEnvelope (Protobuf)
1324                                        match RpcEnvelope::decode(&msg_record.payload[..]) {
1325                                            Ok(envelope) => {
1326                                                let request_id = envelope.request_id.clone();
1327                                                #[cfg(feature = "opentelemetry")]
1328                                                let span = {
1329                                                        let span = tracing::info_span!("actrNode.mailbox.receive_rpc", request_id = %request_id);
1330                                                        set_parent_from_rpc_envelope(&span, &envelope);
1331                                                        span
1332                                                    };
1333                                                tracing::debug!("📦 Processing message: request_id={}", request_id);
1334
1335                                                // Decode caller_id from MessageRecord.from (transport layer)
1336                                                let caller_id_result = ActrId::decode(&msg_record.from[..]);
1337                                                let caller_id_ref = caller_id_result.as_ref().ok();
1338
1339                                                if caller_id_ref.is_none() {
1340                                                    tracing::warn!(
1341                                                        request_id = %request_id,
1342                                                        "⚠️  Failed to decode caller_id from MessageRecord.from"
1343                                                    );
1344                                                }
1345
1346                                                // Call handle_incoming with caller_id from transport layer
1347                                                let handle_incoming_fut = node.handle_incoming(envelope.clone(), caller_id_ref);
1348                                                #[cfg(feature = "opentelemetry")]
1349                                                let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
1350                                                match handle_incoming_fut.await {
1351                                                    Ok(response_bytes) => {
1352                                                        // Send response (reuse request_id)
1353                                                        if let Some(ref gate) = gate {
1354                                                            // Use already decoded caller_id
1355                                                            match caller_id_result {
1356                                                                Ok(caller) => {
1357                                                                    // Construct response RpcEnvelope (reuse request_id!)
1358                                                                    #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1359                                                                    let mut response_envelope = RpcEnvelope {
1360                                                                        request_id,  // Reuse!
1361                                                                        route_key: envelope.route_key.clone(),
1362                                                                        payload: Some(response_bytes),
1363                                                                        error: None,
1364                                                                        traceparent: envelope.traceparent.clone(),
1365                                                                        tracestate: envelope.tracestate.clone(),
1366                                                                        metadata: Vec::new(),  // Response doesn't need extra metadata
1367                                                                        timeout_ms: 30000,
1368                                                                    };
1369                                                                    // Inject tracing context
1370                                                                    #[cfg(feature = "opentelemetry")]
1371                                                                    inject_span_context_to_rpc(&span, &mut response_envelope);
1372
1373                                                                    let send_response_fut = gate.send_response(&caller, response_envelope);
1374                                                                    #[cfg(feature = "opentelemetry")]
1375                                                                    let send_response_fut = send_response_fut.instrument(span);
1376                                                                    if let Err(e) = send_response_fut.await {
1377                                                                        tracing::error!(
1378                                                                            severity = 7,
1379                                                                            error_category = "transport_error",
1380                                                                            request_id = %envelope.request_id,
1381                                                                            "❌ Failed to send response: {:?}", e
1382                                                                        );
1383                                                                    }
1384                                                                }
1385                                                                Err(e) => {
1386                                                                    tracing::error!(
1387                                                                        severity = 8,
1388                                                                        error_category = "protobuf_decode",
1389                                                                        request_id = %envelope.request_id,
1390                                                                        "❌ Failed to decode caller_id: {:?}", e
1391                                                                    );
1392                                                                }
1393                                                            }
1394                                                        }
1395
1396                                                        // ACK message
1397                                                        if let Err(e) = mailbox.ack(msg_record.id).await {
1398                                                            tracing::error!(
1399                                                                severity = 9,
1400                                                                error_category = "mailbox_error",
1401                                                                request_id = %envelope.request_id,
1402                                                                message_id = %msg_record.id,
1403                                                                "❌ Mailbox ACK failed: {:?}", e
1404                                                            );
1405                                                        }
1406                                                    }
1407                                                    Err(e) => {
1408                                                        tracing::error!(
1409                                                            severity = 6,
1410                                                            error_category = "handler_error",
1411                                                            request_id = %envelope.request_id,
1412                                                            route_key = %envelope.route_key,
1413                                                            "❌ handle_incoming failed: {:?}", e
1414                                                        );
1415                                                        // ACK to avoid infinite retries
1416                                                        // Application errors are caller's responsibility
1417                                                        let _ = mailbox.ack(msg_record.id).await;
1418                                                    }
1419                                                }
1420                                            }
1421                                            Err(e) => {
1422                                                // Poison message - cannot decode RpcEnvelope
1423                                                tracing::error!(
1424                                                    severity = 9,
1425                                                    error_category = "protobuf_decode",
1426                                                    message_id = %msg_record.id,
1427                                                    "❌ Poison message: Failed to deserialize RpcEnvelope: {:?}", e
1428                                                );
1429
1430                                                // Write to Dead Letter Queue
1431                                                use actr_mailbox::DlqRecord;
1432                                                use chrono::Utc;
1433                                                use uuid::Uuid;
1434
1435                                                let dlq_record = DlqRecord {
1436                                                    id: Uuid::new_v4(),
1437                                                    original_message_id: Some(msg_record.id.to_string()),
1438                                                    from: Some(msg_record.from.clone()),
1439                                                    to: node.actor_id.as_ref().map(|id| {
1440                                                        let mut buf = Vec::new();
1441                                                        id.encode(&mut buf).unwrap();
1442                                                        buf
1443                                                    }),
1444                                                    raw_bytes: msg_record.payload.clone(),
1445                                                    error_message: format!("Protobuf decode failed: {e}"),
1446                                                    error_category: "protobuf_decode".to_string(),
1447                                                    trace_id: format!("mailbox-{}", msg_record.id),  // Fallback trace_id
1448                                                    request_id: None,
1449                                                    created_at: Utc::now(),
1450                                                    redrive_attempts: 0,
1451                                                    last_redrive_at: None,
1452                                                    context: Some(format!(
1453                                                        r#"{{"source":"mailbox","priority":"{}"}}"#,
1454                                                        match msg_record.priority {
1455                                                            actr_mailbox::MessagePriority::High => "high",
1456                                                            actr_mailbox::MessagePriority::Normal => "normal",
1457                                                        }
1458                                                    )),
1459                                                };
1460
1461                                                if let Err(dlq_err) = node.dlq.enqueue(dlq_record).await {
1462                                                    tracing::error!(
1463                                                        severity = 10,
1464                                                        "❌ CRITICAL: Failed to write poison message to DLQ: {:?}", dlq_err
1465                                                    );
1466                                                } else {
1467                                                    tracing::warn!(
1468                                                        severity = 9,
1469                                                        "☠️ Poison message moved to DLQ: message_id={}", msg_record.id
1470                                                    );
1471                                                }
1472
1473                                                // ACK the poison message to remove from mailbox
1474                                                let _ = mailbox.ack(msg_record.id).await;
1475                                            }
1476                                        }
1477                                    }
1478                                }
1479                                Err(e) => {
1480                                    tracing::error!(
1481                                        severity = 9,
1482                                        error_category = "mailbox_error",
1483                                        "❌ Mailbox dequeue failed: {:?}", e
1484                                    );
1485                                    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1486                                }
1487                            }
1488                        }
1489                    }
1490                }
1491                tracing::info!("✅ Mailbox processing loop terminated gracefully");
1492            });
1493
1494            task_handles.push(mailbox_handle);
1495        }
1496        tracing::info!("✅ Mailbox processing loop started");
1497
1498        tracing::info!("✅ ActrNode started successfully");
1499
1500        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1501        // 6. Create ActrRef for Shell to interact with Workload
1502        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1503        use crate::actr_ref::{ActrRef, ActrRefShared};
1504        use crate::outbound::InprocOutGate;
1505
1506        // Create InprocOutGate from shell_to_workload transport manager
1507        let shell_to_workload = node_ref
1508            .inproc_mgr
1509            .clone()
1510            .expect("inproc_mgr must be initialized");
1511        let inproc_gate = Arc::new(InprocOutGate::new(shell_to_workload));
1512
1513        // Create ActrRefShared
1514        let actr_ref_shared = Arc::new(ActrRefShared {
1515            actor_id: actor_id_for_shell.clone(),
1516            inproc_gate,
1517            shutdown_token: shutdown_token.clone(),
1518            task_handles: tokio::sync::Mutex::new(task_handles),
1519        });
1520
1521        // Create ActrRef
1522        let actr_ref = ActrRef::new(actr_ref_shared, node_ref);
1523
1524        tracing::info!("✅ ActrRef created (Shell → Workload communication handle)");
1525
1526        Ok(actr_ref)
1527    }
1528}
1529
1530#[cfg(test)]
1531mod tests {
1532    use super::*;
1533    use actr_protocol::AIdCredential;
1534    use prost_types::Timestamp;
1535
1536    fn create_test_credential(token_key_id: u32) -> AIdCredential {
1537        AIdCredential {
1538            encrypted_token: vec![1, 2, 3, 4].into(),
1539            token_key_id,
1540        }
1541    }
1542
1543    fn create_test_timestamp(seconds: i64) -> Timestamp {
1544        Timestamp { seconds, nanos: 0 }
1545    }
1546
1547    #[tokio::test]
1548    async fn test_credential_state_initialization() {
1549        let credential = create_test_credential(1);
1550        let expires_at = Some(create_test_timestamp(1000));
1551
1552        let state = CredentialState::new(credential.clone(), expires_at, None);
1553
1554        let retrieved_credential = state.credential().await;
1555        assert_eq!(retrieved_credential.token_key_id, 1);
1556        assert_eq!(retrieved_credential.encrypted_token.as_ref(), &[1, 2, 3, 4]);
1557
1558        let retrieved_expires_at = state.expires_at().await;
1559        assert_eq!(retrieved_expires_at, expires_at);
1560    }
1561
1562    #[tokio::test]
1563    async fn test_credential_state_without_expiration() {
1564        let credential = create_test_credential(2);
1565        let state = CredentialState::new(credential.clone(), None, None);
1566
1567        let retrieved_credential = state.credential().await;
1568        assert_eq!(retrieved_credential.token_key_id, 2);
1569
1570        let retrieved_expires_at = state.expires_at().await;
1571        assert!(retrieved_expires_at.is_none());
1572    }
1573
1574    #[tokio::test]
1575    async fn test_credential_state_update() {
1576        let credential1 = create_test_credential(1);
1577        let expires_at1 = Some(create_test_timestamp(1000));
1578        let state = CredentialState::new(credential1, expires_at1, None);
1579
1580        // Verify initial state
1581        let initial_credential = state.credential().await;
1582        assert_eq!(initial_credential.token_key_id, 1);
1583
1584        // Update credential
1585        let credential2 = create_test_credential(2);
1586        let expires_at2 = Some(create_test_timestamp(2000));
1587        state.update(credential2.clone(), expires_at2, None).await;
1588
1589        // Verify updated state
1590        let updated_credential = state.credential().await;
1591        assert_eq!(updated_credential.token_key_id, 2);
1592        assert_eq!(
1593            updated_credential.encrypted_token,
1594            credential2.encrypted_token
1595        );
1596
1597        let updated_expires_at = state.expires_at().await;
1598        assert_eq!(updated_expires_at, Some(create_test_timestamp(2000)));
1599    }
1600
1601    #[tokio::test]
1602    async fn test_credential_state_concurrent_access() {
1603        let credential = create_test_credential(1);
1604        let expires_at = Some(create_test_timestamp(1000));
1605        let state = CredentialState::new(credential, expires_at, None);
1606
1607        // Spawn multiple tasks that concurrently access the credential state
1608        let mut handles = vec![];
1609        for i in 0..10 {
1610            let state_clone = state.clone();
1611            let handle = tokio::spawn(async move {
1612                let cred = state_clone.credential().await;
1613                assert_eq!(cred.token_key_id, 1);
1614                i
1615            });
1616            handles.push(handle);
1617        }
1618
1619        // Wait for all tasks to complete
1620        for handle in handles {
1621            let result = handle.await.unwrap();
1622            assert!(result < 10);
1623        }
1624    }
1625
1626    #[tokio::test]
1627    async fn test_credential_state_update_concurrent() {
1628        let credential1 = create_test_credential(1);
1629        let state = CredentialState::new(credential1, None, None);
1630
1631        // Spawn multiple update tasks
1632        let mut handles = vec![];
1633        for i in 2..12 {
1634            let state_clone = state.clone();
1635            let credential = create_test_credential(i);
1636            let handle = tokio::spawn(async move {
1637                state_clone.update(credential, None, None).await;
1638            });
1639            handles.push(handle);
1640        }
1641
1642        // Wait for all updates to complete
1643        for handle in handles {
1644            handle.await.unwrap();
1645        }
1646
1647        // Verify final state (should be the last update)
1648        let final_credential = state.credential().await;
1649        // The exact value depends on which update finished last, but it should be valid
1650        assert!(final_credential.token_key_id >= 2 && final_credential.token_key_id <= 11);
1651    }
1652}