actr_runtime/lifecycle/
actr_node.rs

1//! ActrNode - ActrSystem + Workload (1:1 composition)
2
3use crate::context_factory::ContextFactory;
4use crate::lifecycle::compat_lock::{CompatLockManager, CompatibilityCheck};
5use crate::transport::InprocTransportManager;
6#[cfg(feature = "opentelemetry")]
7use crate::wire::webrtc::trace::{inject_span_context_to_rpc, set_parent_from_rpc_envelope};
8use actr_framework::{Bytes, Workload};
9use actr_mailbox::{DeadLetterQueue, Mailbox};
10use actr_protocol::prost::Message as ProstMessage;
11use actr_protocol::{
12    AIdCredential, ActorResult, ActrId, ActrType, CandidateCompatibilityInfo, PayloadType,
13    RegisterRequest, RouteCandidatesRequest, RpcEnvelope, register_response,
14    route_candidates_request,
15};
16use futures_util::FutureExt;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::RwLock;
20use tokio_util::sync::CancellationToken;
21#[cfg(feature = "opentelemetry")]
22use tracing::Instrument as _;
23// Use types from sub-crates
24use crate::wire::webrtc::SignalingClient;
25// Use extension traits from actr-protocol
26use actr_protocol::{ActrIdExt, ActrTypeExt};
27// Use heartbeat functions
28use crate::lifecycle::heartbeat::heartbeat_task;
29
30// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
31// Service Discovery Result
32// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
33
34/// Result of a service discovery request with compatibility information.
35///
36/// This struct is returned by `discover_route_candidates` and provides
37/// detailed information about the compatibility status when fingerprint-based
38/// discovery is used.
39#[derive(Debug, Clone)]
40pub struct DiscoveryResult {
41    /// Ordered list of compatible candidates (best match first)
42    pub candidates: Vec<ActrId>,
43    /// True if at least one candidate has an exact fingerprint match
44    pub has_exact_match: bool,
45    /// True if system is in sub-healthy state (compatible but not exact match)
46    pub is_sub_healthy: bool,
47    /// Detailed compatibility info for each candidate (when fingerprint was provided)
48    pub compatibility_info: Vec<CandidateCompatibilityInfo>,
49}
50
51// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
52// Constants
53// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
54
55/// ActrNode - ActrSystem + Workload (1:1 composition)
56///
57/// # Generic Parameters
58/// - `W`: Workload type
59///
60/// # MessageDispatcher Association
61/// - Statically associated via W::Dispatcher
62/// - Does not store Dispatcher instance (not even ZST needed)
63/// - Dispatch calls entirely through type system
64pub struct ActrNode<W: Workload> {
65    /// Runtime configuration
66    pub(crate) config: actr_config::Config,
67
68    /// Workload instance (the only business logic)
69    pub(crate) workload: Arc<W>,
70
71    /// SQLite persistent mailbox
72    pub(crate) mailbox: Arc<dyn Mailbox>,
73
74    /// Dead Letter Queue for poison messages
75    pub(crate) dlq: Arc<dyn DeadLetterQueue>,
76
77    /// Context factory (created in start() after obtaining ActorId)
78    pub(crate) context_factory: Option<ContextFactory>,
79
80    /// Signaling client
81    pub(crate) signaling_client: Arc<dyn SignalingClient>,
82
83    /// Actor ID (obtained after startup)
84    pub(crate) actor_id: Option<ActrId>,
85
86    /// Actor Credential (obtained after startup, used for subsequent authentication messages)
87    pub(crate) credential_state: Option<CredentialState>,
88
89    /// Pre-shared key for TURN authentication (obtained from registration)
90    pub(crate) psk: Option<Bytes>,
91
92    /// WebRTC coordinator (created after startup)
93    pub(crate) webrtc_coordinator: Option<Arc<crate::wire::webrtc::coordinator::WebRtcCoordinator>>,
94
95    /// WebRTC Gate (created after startup)
96    pub(crate) webrtc_gate: Option<Arc<crate::wire::webrtc::gate::WebRtcGate>>,
97
98    /// Shell → Workload Transport Manager
99    ///
100    /// Workload receives REQUEST from Shell (zero serialization, direct RpcEnvelope passing)
101    pub(crate) inproc_mgr: Option<Arc<InprocTransportManager>>,
102
103    /// Workload → Shell Transport Manager
104    ///
105    /// Workload sends RESPONSE to Shell (separate pending_requests from Shell's)
106    pub(crate) workload_to_shell_mgr: Option<Arc<InprocTransportManager>>,
107
108    /// Shutdown token for graceful shutdown
109    pub(crate) shutdown_token: CancellationToken,
110
111    /// Actr.lock.toml content (loaded at startup for fingerprint lookups)
112    pub(crate) actr_lock: Option<actr_config::lock::LockFile>,
113    /// Network event receiver (from NetworkEventHandle)
114    pub(crate) network_event_rx:
115        Option<tokio::sync::mpsc::Receiver<crate::lifecycle::network_event::NetworkEvent>>,
116
117    /// Network event result sender (to NetworkEventHandle)
118    pub(crate) network_event_result_tx:
119        Option<tokio::sync::mpsc::Sender<crate::lifecycle::network_event::NetworkEventResult>>,
120}
121
122/// Credential state for shared access between tasks
123#[derive(Clone)]
124pub struct CredentialState {
125    inner: Arc<RwLock<CredentialStateInner>>,
126}
127
128#[derive(Clone)]
129struct CredentialStateInner {
130    credential: AIdCredential,
131    expires_at: Option<prost_types::Timestamp>,
132    /// This is updated together with credential when credential is refreshed
133    psk: Option<Bytes>,
134}
135
136impl CredentialState {
137    /// Create a new CredentialState with PSK
138    pub(crate) fn new(
139        credential: AIdCredential,
140        expires_at: Option<prost_types::Timestamp>,
141        psk: Option<Bytes>,
142    ) -> Self {
143        Self {
144            inner: Arc::new(RwLock::new(CredentialStateInner {
145                credential,
146                expires_at,
147                psk,
148            })),
149        }
150    }
151
152    pub async fn credential(&self) -> AIdCredential {
153        self.inner.read().await.credential.clone()
154    }
155
156    pub async fn expires_at(&self) -> Option<prost_types::Timestamp> {
157        self.inner.read().await.expires_at
158    }
159
160    /// Get the PSK for TURN authentication
161    pub async fn psk(&self) -> Option<Bytes> {
162        self.inner.read().await.psk.clone()
163    }
164
165    /// Update credential along with PSK
166    /// This should be called when credential is refreshed and a new PSK is provided
167    pub(crate) async fn update(
168        &self,
169        credential: AIdCredential,
170        expires_at: Option<prost_types::Timestamp>,
171        psk: Option<Bytes>,
172    ) {
173        let mut guard = self.inner.write().await;
174        guard.credential = credential;
175        guard.expires_at = expires_at;
176        if psk.is_some() {
177            guard.psk = psk;
178        }
179    }
180}
181
182/// Map ProtocolError to error code for ErrorResponse
183fn protocol_error_to_code(err: &actr_protocol::ProtocolError) -> u32 {
184    use actr_protocol::ProtocolError;
185    match err {
186        ProtocolError::Actr(_) => 400, // Bad Request - identity/decode error
187        ProtocolError::Uri(_) => 400,  // Bad Request - URI parsing error
188        ProtocolError::Name(_) => 400, // Bad Request - invalid name
189        ProtocolError::SerializationError(_) => 500, // Internal Server Error
190        ProtocolError::DeserializationError(_) => 400, // Bad Request - invalid payload
191        ProtocolError::DecodeError(_) => 400, // Bad Request - decode failure
192        ProtocolError::EncodeError(_) => 500, // Internal Server Error
193        ProtocolError::UnknownRoute(_) => 404, // Not Found - route not found
194        ProtocolError::TransportError(_) => 503, // Service Unavailable
195        ProtocolError::Timeout => 504, // Gateway Timeout
196        ProtocolError::TargetNotFound(_) => 404, // Not Found
197        ProtocolError::TargetUnavailable(_) => 503, // Service Unavailable
198        ProtocolError::InvalidStateTransition(_) => 500, // Internal Server Error
199    }
200}
201
202/// Check ACL permission for incoming request
203///
204/// # Arguments
205/// - `caller_id`: The ActrId of the caller (None for local calls)
206/// - `target_id`: The ActrId of the target (self)
207/// - `acl`: ACL rules from configuration
208///
209/// # Returns
210/// - `Ok(true)`: Permission granted
211/// - `Ok(false)`: Permission denied
212/// - `Err`: ACL check failed (treat as deny)
213///
214/// # ACL Evaluation Logic
215/// 1. If no caller_id (local call), always allow
216/// 2. If no ACL configured, allow by default (permissive mode for backward compatibility)
217/// 3. If ACL configured but rules are empty, deny all (secure by default)
218/// 4. Iterate through ACL rules in order (first match wins)
219///    - Check if caller matches any principal in the rule
220///    - If matched, return the rule's permission (ALLOW/DENY)
221/// 5. If no rule matches, deny by default (secure by default)
222fn check_acl_permission(
223    caller_id: Option<&ActrId>,
224    target_id: &ActrId,
225    acl: Option<&actr_protocol::Acl>,
226) -> Result<bool, String> {
227    // 1. Local calls (no caller_id) are always allowed
228    if caller_id.is_none() {
229        tracing::trace!("ACL: Local call, allowing");
230        return Ok(true);
231    }
232
233    let caller = caller_id.unwrap();
234
235    // 2. No ACL configured - allow by default
236    let acl_rules = match acl {
237        Some(acl) => acl,
238        None => {
239            tracing::trace!(
240                "ACL: No ACL configured, allowing {} -> {}",
241                caller.to_string_repr(),
242                target_id.to_string_repr()
243            );
244            return Ok(true);
245        }
246    };
247
248    // 3. If ACL is configured but has no rules, deny all (secure by default)
249    if acl_rules.rules.is_empty() {
250        tracing::warn!(
251            "ACL: ACL configured but no rules defined, denying {} -> {} (default deny)",
252            caller.to_string_repr(),
253            target_id.to_string_repr()
254        );
255        return Ok(false);
256    }
257
258    // 4. Iterate through ACL rules (first match wins)
259    for (rule_idx, rule) in acl_rules.rules.iter().enumerate() {
260        // Check if caller matches any principal in this rule
261        let mut matched = false;
262
263        // If no principals specified, skip this rule (empty allow list = no match)
264        if rule.principals.is_empty() {
265            tracing::trace!(
266                "ACL: Rule {} has empty principals list, skipping (no match)",
267                rule_idx
268            );
269            continue;
270        }
271
272        // Check each principal
273        for principal in &rule.principals {
274            if matches_principal(caller, principal) {
275                matched = true;
276                tracing::trace!(
277                    "ACL: Rule {} matched principal: caller={}, principal_realm={:?}, principal_type={:?}",
278                    rule_idx,
279                    caller.to_string_repr(),
280                    principal.realm.as_ref().map(|r| r.realm_id),
281                    principal.actr_type.as_ref().map(|t| &t.name)
282                );
283                break;
284            }
285        }
286
287        // If matched, return the permission
288        if matched {
289            let permission = rule.permission;
290            let is_allow = permission == actr_protocol::acl_rule::Permission::Allow as i32;
291
292            tracing::debug!(
293                "ACL: Rule {} matched, permission={} for {} -> {}",
294                rule_idx,
295                if is_allow { "ALLOW" } else { "DENY" },
296                caller.to_string_repr(),
297                target_id.to_string_repr()
298            );
299
300            return Ok(is_allow);
301        }
302    }
303
304    // 5. No rule matched - deny by default (secure by default)
305    tracing::warn!(
306        "ACL: No matching rule found, denying {} -> {} (default deny)",
307        caller.to_string_repr(),
308        target_id.to_string_repr()
309    );
310    Ok(false)
311}
312
313/// Check if a caller matches a principal
314///
315/// A principal matches if:
316/// - If principal.realm is specified, it must match caller.realm
317/// - If principal.actr_type is specified, it must match caller.type
318/// - If both are None, principal matches all (should not happen in practice)
319fn matches_principal(caller: &ActrId, principal: &actr_protocol::acl_rule::Principal) -> bool {
320    // Check realm match (if specified)
321    if let Some(ref principal_realm) = principal.realm
322        && caller.realm.realm_id != principal_realm.realm_id
323    {
324        return false;
325    }
326
327    // Check type match (if specified)
328    if let Some(ref principal_type) = principal.actr_type
329        && (caller.r#type.manufacturer != principal_type.manufacturer
330            || caller.r#type.name != principal_type.name)
331    {
332        return false;
333    }
334
335    // If we reach here, all specified fields matched
336    true
337}
338
339impl<W: Workload> ActrNode<W> {
340    /// Get Inproc Transport Manager
341    ///
342    /// # Returns
343    /// - `Some(Arc<InprocTransportManager>)`: Initialized manager
344    /// - `None`: Not yet started (need to call start() first)
345    ///
346    /// # Use Cases
347    /// - Workload internals need to communicate with Shell
348    /// - Create custom LatencyFirst/MediaTrack channels
349    pub fn inproc_mgr(&self) -> Option<Arc<InprocTransportManager>> {
350        self.inproc_mgr.clone()
351    }
352
353    /// Get ActorId (if registration has completed)
354    pub fn actor_id(&self) -> Option<&ActrId> {
355        self.actor_id.as_ref()
356    }
357
358    /// Get credential state (if registration has completed)
359    pub fn credential_state(&self) -> Option<CredentialState> {
360        self.credential_state.clone()
361    }
362
363    /// Get signaling client (for manual control such as UnregisterRequest)
364    pub fn signaling_client(&self) -> Arc<dyn SignalingClient> {
365        self.signaling_client.clone()
366    }
367
368    /// Get shutdown token for this node
369    pub fn shutdown_token(&self) -> CancellationToken {
370        self.shutdown_token.clone()
371    }
372
373    /// Discover remote actors of the specified type via signaling server.
374    ///
375    /// This method implements the full runtime compatibility negotiation workflow
376    /// as specified in the documentation:
377    ///
378    /// # Compatibility Negotiation Flow
379    ///
380    /// 1. **Step 0: Fast Path (compat.lock.toml)**
381    ///    - Check if `compat.lock.toml` has a cached negotiation for this service
382    ///    - If found and not expired, use the cached `resolved_fingerprint` directly
383    ///
384    /// 2. **Step 1: Ideal Path (Exact Match)**
385    ///    - Read the expected fingerprint from `Actr.lock.toml`
386    ///    - Request exact match from signaling server
387    ///    - If found → connection success, system is HEALTHY
388    ///
389    /// 3. **Step 2: Trigger Negotiation (Match Failure)**
390    ///    - If no exact match, enter compatibility negotiation mode
391    ///
392    /// 4. **Step 3: Compatibility Check (Server-side)**
393    ///    - Server performs backward compatibility analysis using proto-sign
394    ///
395    /// 5. **Step 4: Decision**
396    ///    - **Success**: Found compatible version → SUB-HEALTHY state
397    ///      - Update `compat.lock.toml` with negotiation result
398    ///      - Log warning: "SYSTEM SUB-HEALTHY"
399    ///    - **Failure**: No compatible version → FAILED state
400    ///      - Log error: "SYSTEM FAILED"
401    ///
402    /// # Arguments
403    /// - `target_type`: The ActrType of the target service to discover
404    /// - `candidate_count`: Maximum number of candidates to return
405    ///
406    /// # Returns
407    /// A `DiscoveryResult` containing candidates and compatibility information
408    #[cfg_attr(feature = "opentelemetry", tracing::instrument(skip_all))]
409    pub async fn discover_route_candidates(
410        &self,
411        target_type: &ActrType,
412        candidate_count: u32,
413    ) -> ActorResult<DiscoveryResult> {
414        // Check if node is started (has actor_id and credential)
415        let actor_id = self.actor_id.as_ref().ok_or_else(|| {
416            actr_protocol::ProtocolError::InvalidStateTransition(
417                "Node is not started. Call start() first.".to_string(),
418            )
419        })?;
420
421        // Check if the signaling client is connected
422        if !self.signaling_client.is_connected() {
423            return Err(actr_protocol::ProtocolError::TransportError(
424                "Signaling client is not connected.".to_string(),
425            ));
426        }
427
428        let service_name = format!("{}/{}", target_type.manufacturer, target_type.name);
429
430        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
431        // Step 0: Fast Path - Check compat.lock.toml for cached negotiation
432        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
433        let mut compat_lock_manager = CompatLockManager::new(self.config.config_dir.clone());
434        if let Ok(Some(compat_lock)) = compat_lock_manager.load().await {
435            if let Some(cached_entry) = compat_lock.find_valid_entry(&service_name) {
436                tracing::info!(
437                    "⚡ Fast path: Using cached negotiation for '{}' (resolved: {})",
438                    service_name,
439                    &cached_entry.resolved_fingerprint
440                        [..20.min(cached_entry.resolved_fingerprint.len())]
441                );
442
443                // Use the cached resolved_fingerprint to find candidates
444                let result = self
445                    .send_discovery_request(
446                        actor_id,
447                        target_type,
448                        candidate_count,
449                        cached_entry.resolved_fingerprint.clone(),
450                    )
451                    .await?;
452
453                if !result.candidates.is_empty() {
454                    tracing::info!(
455                        "📊 服务发现结果 [{}]: {} 个候选 (快速路径, sub_healthy=true)",
456                        service_name,
457                        result.candidates.len()
458                    );
459                    return Ok(DiscoveryResult {
460                        candidates: result.candidates,
461                        has_exact_match: false, // Cached negotiation means not exact
462                        is_sub_healthy: true,   // Using compat.lock means sub-healthy
463                        compatibility_info: result.compatibility_info,
464                    });
465                }
466                // If fast path fails, fall through to normal discovery
467                tracing::warn!(
468                    "⚠️ Fast path failed for '{}', falling back to normal discovery",
469                    service_name
470                );
471            }
472        }
473
474        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
475        // Step 1: Get fingerprint from Actr.lock.toml (REQUIRED)
476        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
477        let client_fingerprint = self.get_dependency_fingerprint(target_type).ok_or_else(|| {
478            tracing::error!(
479                severity = 10,
480                error_category = "dependency_missing",
481                "❌ DEPENDENCY NOT FOUND: Service '{}' is not declared in Actr.lock.toml.\n\
482                 Please run 'actr install' to generate the lock file with all dependencies.",
483                service_name
484            );
485            actr_protocol::ProtocolError::Actr(actr_protocol::ActrError::DependencyNotFound {
486                service_name: service_name.clone(),
487                message: format!(
488                    "Dependency '{}' not found in Actr.lock.toml. Run 'actr install' to resolve dependencies.",
489                    service_name
490                ),
491            })
492        })?;
493
494        tracing::debug!(
495            "📋 Found dependency fingerprint for '{}': {}",
496            service_name,
497            &client_fingerprint[..20.min(client_fingerprint.len())]
498        );
499
500        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
501        // Step 2: Send discovery request to signaling server
502        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
503        let result = self
504            .send_discovery_request(
505                actor_id,
506                target_type,
507                candidate_count,
508                client_fingerprint.clone(),
509            )
510            .await?;
511
512        let has_exact_match = result.has_exact_match;
513        let is_sub_healthy = result.is_sub_healthy;
514
515        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
516        // Step 3 & 4: Handle negotiation result
517        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
518        self.handle_negotiation_result(
519            target_type,
520            &client_fingerprint,
521            &result.compatibility_info,
522            has_exact_match,
523            is_sub_healthy,
524        )
525        .await;
526
527        // Log result
528        tracing::info!(
529            "📊 服务发现结果 [{}]: {} 个候选, exact_match={}, sub_healthy={}",
530            service_name,
531            result.candidates.len(),
532            has_exact_match,
533            is_sub_healthy
534        );
535
536        Ok(DiscoveryResult {
537            candidates: result.candidates,
538            has_exact_match,
539            is_sub_healthy,
540            compatibility_info: result.compatibility_info,
541        })
542    }
543
544    /// Get dependency fingerprint from Actr.lock.toml
545    fn get_dependency_fingerprint(&self, target_type: &ActrType) -> Option<String> {
546        let actr_lock = self.actr_lock.as_ref()?;
547
548        // Try different name formats to find the dependency
549        let service_name = format!("{}/{}", target_type.manufacturer, target_type.name);
550        let actr_type_name = format!("{}+{}", target_type.manufacturer, target_type.name);
551
552        // First try by service name
553        if let Some(dep) = actr_lock.get_dependency(&service_name) {
554            return Some(dep.fingerprint.clone());
555        }
556
557        // Try by actr_type format
558        if let Some(dep) = actr_lock.get_dependency(&actr_type_name) {
559            return Some(dep.fingerprint.clone());
560        }
561
562        // Try by just the name part
563        if let Some(dep) = actr_lock.get_dependency(&target_type.name) {
564            return Some(dep.fingerprint.clone());
565        }
566
567        // Search through all dependencies by actr_type field
568        for dep in &actr_lock.dependencies {
569            if dep.actr_type == actr_type_name || dep.actr_type == target_type.name {
570                return Some(dep.fingerprint.clone());
571            }
572        }
573
574        None
575    }
576
577    /// Internal: Send discovery request to signaling server
578    async fn send_discovery_request(
579        &self,
580        actor_id: &ActrId,
581        target_type: &ActrType,
582        candidate_count: u32,
583        client_fingerprint: String,
584    ) -> ActorResult<DiscoveryResult> {
585        let client = self.signaling_client.as_ref();
586
587        let criteria = route_candidates_request::NodeSelectionCriteria {
588            candidate_count,
589            ranking_factors: Vec::new(),
590            minimal_dependency_requirement: None,
591            minimal_health_requirement: None,
592        };
593
594        let route_request = RouteCandidatesRequest {
595            target_type: target_type.clone(),
596            criteria: Some(criteria),
597            client_location: None,
598            client_fingerprint,
599        };
600
601        let credential_state = self.credential_state.clone().ok_or_else(|| {
602            actr_protocol::ProtocolError::InvalidStateTransition(
603                "Node is not started. Call start() first.".to_string(),
604            )
605        })?;
606
607        let route_response = client
608            .send_route_candidates_request(
609                actor_id.clone(),
610                credential_state.credential().await,
611                route_request,
612            )
613            .await
614            .map_err(|e| {
615                actr_protocol::ProtocolError::TransportError(format!(
616                    "Route candidates request failed: {e}"
617                ))
618            })?;
619
620        match route_response.result {
621            Some(actr_protocol::route_candidates_response::Result::Success(success)) => {
622                Ok(DiscoveryResult {
623                    candidates: success.candidates,
624                    has_exact_match: success.has_exact_match.unwrap_or(false),
625                    is_sub_healthy: success.is_sub_healthy.unwrap_or(false),
626                    compatibility_info: success.compatibility_info,
627                })
628            }
629            Some(actr_protocol::route_candidates_response::Result::Error(err)) => {
630                Err(actr_protocol::ProtocolError::TransportError(format!(
631                    "Route candidates error {}: {}",
632                    err.code, err.message
633                )))
634            }
635            None => Err(actr_protocol::ProtocolError::TransportError(
636                "Invalid route candidates response: missing result".to_string(),
637            )),
638        }
639    }
640
641    /// Internal: Handle negotiation result - log warnings and update compat.lock.toml
642    async fn handle_negotiation_result(
643        &self,
644        target_type: &ActrType,
645        client_fingerprint: &str,
646        compatibility_info: &[CandidateCompatibilityInfo],
647        has_exact_match: bool,
648        is_sub_healthy: bool,
649    ) {
650        let service_name = format!("{}/{}", target_type.manufacturer, target_type.name);
651
652        // Log detailed compatibility info
653        tracing::info!(
654            "📊 服务发现结果 [{}]: {} 个候选, exact_match={}, sub_healthy={}",
655            service_name,
656            compatibility_info.len(),
657            has_exact_match,
658            is_sub_healthy
659        );
660
661        for info in compatibility_info {
662            let status = if info.is_exact_match.unwrap_or(false) {
663                "✅ 精确匹配"
664            } else if let Some(ref result) = info.analysis_result {
665                match result.level() {
666                    actr_protocol::CompatibilityLevel::FullyCompatible => "✅ 完全兼容",
667                    actr_protocol::CompatibilityLevel::BackwardCompatible => "⚠️ 向后兼容",
668                    actr_protocol::CompatibilityLevel::BreakingChanges => "❌ 破坏性变更",
669                }
670            } else {
671                "❓ 未知"
672            };
673
674            tracing::debug!(
675                "   - 候选 {}: {} (指纹: {})",
676                info.candidate_id.serial_number,
677                status,
678                &info.candidate_fingerprint[..20.min(info.candidate_fingerprint.len())]
679            );
680        }
681
682        // Handle sub-healthy state
683        if is_sub_healthy && !has_exact_match {
684            // Find the first compatible (non-exact) match for logging
685            if let Some(resolved) = compatibility_info.first() {
686                tracing::warn!(
687                    "🟡 SYSTEM SUB-HEALTHY: Service '{}' using compatible fingerprint ({}) \
688                     instead of exact match ({}). Run 'actr install --force-update' to restore health.",
689                    service_name,
690                    &resolved.candidate_fingerprint[..20.min(resolved.candidate_fingerprint.len())],
691                    &client_fingerprint[..20.min(client_fingerprint.len())]
692                );
693
694                // Update compat.lock.toml
695                let mut manager = CompatLockManager::new(self.config.config_dir.clone());
696                if let Err(e) = manager
697                    .record_negotiation(
698                        &service_name,
699                        client_fingerprint,
700                        &resolved.candidate_fingerprint,
701                        false, // not exact match
702                        CompatibilityCheck::BackwardCompatible,
703                    )
704                    .await
705                {
706                    tracing::warn!("Failed to update compat.lock.toml: {}", e);
707                }
708            }
709        } else if has_exact_match {
710            // Exact match found - try to clean up compat.lock.toml entry if exists
711            let mut manager = CompatLockManager::new(self.config.config_dir.clone());
712            if let Ok(Some(_)) = manager.load().await {
713                if let Some(resolved) = compatibility_info.first() {
714                    if let Err(e) = manager
715                        .record_negotiation(
716                            &service_name,
717                            client_fingerprint,
718                            &resolved.candidate_fingerprint,
719                            true, // exact match
720                            CompatibilityCheck::ExactMatch,
721                        )
722                        .await
723                    {
724                        tracing::debug!("Could not update compat.lock.toml: {}", e);
725                    }
726                }
727            }
728        }
729    }
730    /// 网络事件处理循环(后台任务)
731    ///
732    /// # 职责
733    /// - 从 Channel 接收网络事件
734    /// - 委托给 NetworkEventProcessor 处理
735    /// - 记录处理时间并发送结果
736    async fn network_event_loop(
737        mut event_rx: tokio::sync::mpsc::Receiver<crate::lifecycle::network_event::NetworkEvent>,
738        result_tx: tokio::sync::mpsc::Sender<crate::lifecycle::network_event::NetworkEventResult>,
739        event_processor: Arc<dyn crate::lifecycle::network_event::NetworkEventProcessor>,
740        shutdown_token: CancellationToken,
741    ) {
742        use crate::lifecycle::network_event::{NetworkEvent, NetworkEventResult};
743
744        tracing::info!("🔄 Network event loop started");
745
746        loop {
747            tokio::select! {
748                // 接收网络事件
749                Some(event) = event_rx.recv() => {
750                    let start = std::time::Instant::now();
751
752                    let result = match &event {
753                        NetworkEvent::Available => {
754                            event_processor.process_network_available().await
755                        }
756                        NetworkEvent::Lost => {
757                            event_processor.process_network_lost().await
758                        }
759                        NetworkEvent::TypeChanged { is_wifi, is_cellular } => {
760                            event_processor.process_network_type_changed(*is_wifi, *is_cellular).await
761                        }
762                    };
763
764                    let duration_ms = start.elapsed().as_millis() as u64;
765
766                    // 构造处理结果
767                    let event_result = match result {
768                        Ok(_) => NetworkEventResult::success(event.clone(), duration_ms),
769                        Err(e) => NetworkEventResult::failure(event.clone(), e, duration_ms),
770                    };
771
772                    // 发送结果(忽略发送失败,避免阻塞)
773                    if let Err(e) = result_tx.send(event_result).await {
774                        tracing::warn!("Failed to send event result: {}", e);
775                    }
776                }
777
778                // 监听关闭信号
779                _ = shutdown_token.cancelled() => {
780                    tracing::info!("🛑 Network event loop shutting down");
781                    break;
782                }
783            }
784        }
785    }
786
787    /// Handle incoming message envelope
788    ///
789    /// # Performance Analysis
790    /// 1. create_context: ~10ns
791    /// 2. W::Dispatcher::dispatch: ~5-10ns (static match, can be inlined)
792    /// 3. User business logic: variable
793    ///
794    /// Framework overhead: ~15-20ns (compared to 50-100ns in traditional approaches)
795    ///
796    /// # Zero-cost Abstraction
797    /// - Compiler can inline entire call chain
798    /// - Match branches can be directly expanded
799    /// - Final generated code approaches hand-written match expression
800    ///
801    /// # Parameters
802    /// - `envelope`: The RPC envelope containing the message
803    /// - `caller_id`: The ActrId of the caller (from transport layer, None for local Shell calls)
804    ///
805    /// # caller_id Design
806    ///
807    /// **Why not in RpcEnvelope?**
808    /// - Transport layer (WebRTC/Mailbox) already knows the sender
809    /// - All connections are direct P2P (no intermediaries)
810    /// - Storing in envelope would be redundant duplication
811    ///
812    /// **How it works:**
813    /// - WebRTC/Mailbox stores sender in `MessageRecord.from` (Protobuf bytes)
814    /// - Only decoded when creating Context (once per message)
815    /// - Shell calls pass `None` (local process, no remote caller)
816    /// - Remote calls decode from `MessageRecord.from`
817    ///
818    /// **trace_id vs request_id:**
819    /// - `trace_id`: Distributed tracing across entire call chain (A → B → C)
820    /// - `request_id`: Unique identifier for each request-response pair
821    /// - Both kept for flexibility in complex scenarios
822    /// - Single-hop calls: effectively identical
823    /// - Multi-hop calls: trace_id spans all hops, request_id per hop
824    #[cfg_attr(
825        feature = "opentelemetry",
826        tracing::instrument(skip_all, name = "ActrNode.handle_incoming")
827    )]
828    pub async fn handle_incoming(
829        &self,
830        envelope: RpcEnvelope,
831        caller_id: Option<&ActrId>,
832    ) -> ActorResult<Bytes> {
833        use actr_framework::MessageDispatcher;
834
835        // Log received message
836        if let Some(caller) = caller_id {
837            tracing::debug!(
838                "📨 Handling incoming message: route_key={}, caller={}, request_id={}",
839                envelope.route_key,
840                caller.to_string_repr(),
841                envelope.request_id
842            );
843        } else {
844            tracing::debug!(
845                "📨 Handling incoming message: route_key={}, request_id={}",
846                envelope.route_key,
847                envelope.request_id
848            );
849        }
850
851        // 0. Get actor_id early for ACL check
852        let actor_id = self.actor_id.as_ref().ok_or_else(|| {
853            actr_protocol::ProtocolError::InvalidStateTransition(
854                "Actor ID not set - node must be started before handling messages".to_string(),
855            )
856        })?;
857
858        // 0.1. ACL Permission Check (before processing message)
859        let acl_allowed = check_acl_permission(caller_id, actor_id, self.config.acl.as_ref())
860            .map_err(|err_msg| {
861                actr_protocol::ProtocolError::TransportError(format!(
862                    "ACL check failed: {}",
863                    err_msg
864                ))
865            })?;
866
867        if !acl_allowed {
868            tracing::warn!(
869                severity = 5,
870                error_category = "acl_denied",
871                request_id = %envelope.request_id,
872                route_key = %envelope.route_key,
873                caller = %caller_id.map(|c| c.to_string_repr()).unwrap_or_else(|| "<none>".to_string()),
874                "🚫 ACL: Permission denied"
875            );
876
877            return Err(actr_protocol::ProtocolError::Actr(
878                actr_protocol::ActrError::PermissionDenied {
879                    message: format!(
880                        "ACL denied: {} is not allowed to call {}",
881                        caller_id
882                            .map(|c| c.to_string_repr())
883                            .unwrap_or_else(|| "<unknown>".to_string()),
884                        actor_id.to_string_repr()
885                    ),
886                },
887            ));
888        }
889
890        // 1. Create Context with caller_id from transport layer
891        let credential_state = self.credential_state.clone().ok_or_else(|| {
892            actr_protocol::ProtocolError::InvalidStateTransition(
893                "Credential not set - node must be started before handling messages".to_string(),
894            )
895        })?;
896        let ctx = self
897            .context_factory
898            .as_ref()
899            .expect("ContextFactory must be initialized in start()")
900            .create(
901                actor_id,
902                caller_id, // caller_id from transport layer (MessageRecord.from)
903                &envelope.request_id,
904                &credential_state.credential().await,
905            );
906
907        // 2. Static MessageRouter dispatch (zero-cost abstraction)
908        // Compiler will inline entire call chain, generating code close to hand-written match
909        //
910        // Wrap dispatch in panic catching to prevent handler panics from crashing the runtime
911        let result = std::panic::AssertUnwindSafe(W::Dispatcher::dispatch(
912            &self.workload,
913            envelope.clone(),
914            &ctx,
915        ))
916        .catch_unwind()
917        .await;
918
919        let result = match result {
920            Ok(handler_result) => handler_result,
921            Err(panic_payload) => {
922                // Handler panicked - extract panic info
923                let panic_info = if let Some(s) = panic_payload.downcast_ref::<&str>() {
924                    s.to_string()
925                } else if let Some(s) = panic_payload.downcast_ref::<String>() {
926                    s.clone()
927                } else {
928                    "Unknown panic payload".to_string()
929                };
930
931                tracing::error!(
932                    severity = 8,
933                    error_category = "handler_panic",
934                    route_key = envelope.route_key,
935                    request_id = %envelope.request_id,
936                    "❌ Handler panicked: {}",
937                    panic_info
938                );
939
940                // Return DecodeFailure error with panic info
941                // (using DecodeFailure as a proxy for "cannot process message")
942                Err(actr_protocol::ProtocolError::Actr(
943                    actr_protocol::ActrError::DecodeFailure {
944                        message: format!("Handler panicked: {panic_info}"),
945                    },
946                ))
947            }
948        };
949
950        // 3. Log result
951        match &result {
952            Ok(_) => tracing::debug!(
953                request_id = %envelope.request_id,
954                route_key = %envelope.route_key,
955                "✅ Message handled successfully"
956            ),
957            Err(e) => tracing::error!(
958                severity = 6,
959                error_category = "handler_error",
960                request_id = %envelope.request_id,
961                route_key = %envelope.route_key,
962                "❌ Message handling failed: {:?}", e
963            ),
964        }
965
966        result
967    }
968
969    /// Start the system
970    ///
971    /// # Startup Sequence
972    /// 1. Connect to signaling server and register Actor
973    /// 2. Initialize transport layer (WebRTC)
974    /// 3. Call lifecycle hook on_start (if Lifecycle trait is implemented)
975    /// 4. Start Mailbox processing loop (State Path serial processing)
976    /// 5. Start Transport (begin receiving messages)
977    /// 6. Create ActrRef for Shell to interact with Workload
978    ///
979    /// # Returns
980    /// - `ActrRef<W>`: Lightweight reference for Shell to call Workload methods
981    pub async fn start(mut self) -> ActorResult<crate::actr_ref::ActrRef<W>> {
982        tracing::info!("🚀 Starting ActrNode");
983
984        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
985        // 1. Connect to signaling server and register
986        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
987        tracing::info!("📡 Connecting to signaling server");
988        self.signaling_client.connect().await.map_err(|e| {
989            actr_protocol::ProtocolError::TransportError(format!("Signaling connect failed: {e}"))
990        })?;
991        tracing::info!("✅ Connected to signaling server");
992
993        // Get ActrType from configuration
994        let actr_type = self.config.actr_type().clone();
995        tracing::info!("📋 Actor type: {}", actr_type.to_string_repr());
996
997        // Calculate ServiceSpec from config exports
998        let service_spec = self.config.calculate_service_spec();
999        if let Some(ref spec) = service_spec {
1000            tracing::info!("📦 Service fingerprint: {}", spec.fingerprint);
1001            tracing::info!("📦 Service tags: {:?}", spec.tags);
1002        } else {
1003            tracing::info!("📦 No proto exports, ServiceSpec is None");
1004        }
1005
1006        // Construct protobuf RegisterRequest
1007        let register_request = RegisterRequest {
1008            actr_type: actr_type.clone(),
1009            realm: self.config.realm,
1010            service_spec,
1011            acl: self.config.acl.clone(),
1012        };
1013
1014        tracing::info!("📤 Registering actor with signaling server (protobuf)");
1015
1016        // Use send_register_request to send and wait for response
1017        let register_response = self
1018            .signaling_client
1019            .send_register_request(register_request)
1020            .await
1021            .map_err(|e| {
1022                actr_protocol::ProtocolError::TransportError(format!(
1023                    "Actor registration failed: {e}"
1024                ))
1025            })?;
1026
1027        // Handle RegisterResponse oneof result
1028        //
1029        // Collect background task handles (including unregister task) so they can be managed
1030        // by ActrRefShared later.
1031        let mut task_handles = Vec::new();
1032
1033        match register_response.result {
1034            Some(register_response::Result::Success(register_ok)) => {
1035                let actor_id = register_ok.actr_id;
1036                let credential = register_ok.credential;
1037
1038                tracing::info!("✅ Registration successful");
1039                tracing::info!(
1040                    "🆔 Assigned ActrId: {}",
1041                    actr_protocol::ActrIdExt::to_string_repr(&actor_id)
1042                );
1043                tracing::info!(
1044                    "🔐 Received credential (token_key_id: {})",
1045                    credential.token_key_id
1046                );
1047                tracing::info!(
1048                    "💓 Signaling heartbeat interval: {} seconds",
1049                    register_ok.signaling_heartbeat_interval_secs
1050                );
1051
1052                // Log additional information (if available)
1053                if register_ok.psk.is_some() {
1054                    tracing::debug!("🔑 Received PSK (bootstrap keying material)");
1055                }
1056
1057                if let Some(expires_at) = &register_ok.credential_expires_at {
1058                    tracing::debug!("⏰ Credential expires at: {}s", expires_at.seconds);
1059                }
1060
1061                // Store ActrId and Credential
1062                self.actor_id = Some(actor_id.clone());
1063                let credential_state = CredentialState::new(
1064                    credential,
1065                    register_ok.credential_expires_at,
1066                    register_ok.psk.clone(),
1067                );
1068                self.credential_state = Some(credential_state.clone());
1069
1070                // Pass identity to signaling client so reconnect URLs carry auth info.
1071                self.signaling_client.set_actor_id(actor_id.clone()).await;
1072                self.signaling_client
1073                    .set_credential_state(credential_state.clone())
1074                    .await;
1075                // Store PSK and public_key for TURN authentication
1076                self.psk = register_ok.psk.clone();
1077
1078                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1079                // 1.2. Set actr_lock in ContextFactory for fingerprint lookups
1080                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1081                if let Some(actr_lock) = self.actr_lock.clone() {
1082                    self.context_factory
1083                        .as_mut()
1084                        .expect("ContextFactory must exist")
1085                        .set_actr_lock(actr_lock);
1086                    tracing::info!(
1087                        "✅ Actr.lock.toml set in ContextFactory for fingerprint lookups"
1088                    );
1089                }
1090
1091                // Set config_dir in ContextFactory for compat.lock.toml Fast Path
1092                self.context_factory
1093                    .as_mut()
1094                    .expect("ContextFactory must exist")
1095                    .set_config_dir(self.config.config_dir.clone());
1096                tracing::info!(
1097                    "✅ config_dir set in ContextFactory for compat.lock.toml Fast Path"
1098                );
1099
1100                // Persist identity into ContextFactory for later Context creation
1101                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1102                // 1.3. Store references to both inproc managers (already created in ActrSystem::new())
1103                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1104                let shell_to_workload = self
1105                    .context_factory
1106                    .as_ref()
1107                    .expect("ContextFactory must exist")
1108                    .shell_to_workload();
1109                let workload_to_shell = self
1110                    .context_factory
1111                    .as_ref()
1112                    .expect("ContextFactory must exist")
1113                    .workload_to_shell();
1114                self.inproc_mgr = Some(shell_to_workload); // Workload receives from this
1115                self.workload_to_shell_mgr = Some(workload_to_shell); // Workload sends to this
1116
1117                tracing::info!(
1118                    "✅ Inproc infrastructure already ready (created in ActrSystem::new())"
1119                );
1120
1121                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1122                // 1.5. Create WebRTC infrastructure
1123                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1124                tracing::info!("🌐 Initializing WebRTC infrastructure");
1125
1126                // Get MediaFrameRegistry from ContextFactory
1127                let media_frame_registry = self
1128                    .context_factory
1129                    .as_ref()
1130                    .expect("ContextFactory must exist")
1131                    .media_frame_registry
1132                    .clone();
1133
1134                // Create WebRtcCoordinator
1135                let coordinator =
1136                    Arc::new(crate::wire::webrtc::coordinator::WebRtcCoordinator::new(
1137                        actor_id.clone(),
1138                        credential_state.clone(),
1139                        self.signaling_client.clone(),
1140                        self.config.webrtc.clone(),
1141                        self.config.realm.realm_id.clone(),
1142                        media_frame_registry,
1143                    ));
1144
1145                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1146                // 1.6. Create OutprocTransportManager + OutprocOutGate (新架构)
1147                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1148                tracing::info!("🏗️  Creating OutprocTransportManager with WebRTC support");
1149
1150                // Create DefaultWireBuilder with WebRTC coordinator
1151                use crate::transport::{DefaultWireBuilder, DefaultWireBuilderConfig};
1152                let wire_builder_config = DefaultWireBuilderConfig {
1153                    websocket_url_template: None, // WebSocket disabled for now
1154                    enable_webrtc: true,
1155                    enable_websocket: false,
1156                };
1157                let wire_builder = Arc::new(DefaultWireBuilder::new(
1158                    Some(coordinator.clone()),
1159                    wire_builder_config,
1160                ));
1161
1162                // Create OutprocTransportManager
1163                use crate::transport::OutprocTransportManager;
1164                let transport_manager =
1165                    Arc::new(OutprocTransportManager::new(actor_id.clone(), wire_builder));
1166
1167                // Create OutprocOutGate with WebRTC coordinator for MediaTrack support
1168                use crate::outbound::{OutGate, OutprocOutGate};
1169                let outproc_gate = Arc::new(OutprocOutGate::new(
1170                    transport_manager,
1171                    Some(coordinator.clone()), // Enable MediaTrack support
1172                ));
1173                let outproc_gate_enum = OutGate::OutprocOut(outproc_gate.clone());
1174
1175                tracing::info!("✅ OutprocTransportManager + OutprocOutGate initialized");
1176
1177                // Get DataStreamRegistry from ContextFactory
1178                let data_stream_registry = self
1179                    .context_factory
1180                    .as_ref()
1181                    .expect("ContextFactory must exist")
1182                    .data_stream_registry
1183                    .clone();
1184
1185                // Create WebRtcGate with shared pending_requests and DataStreamRegistry
1186                let pending_requests = outproc_gate.get_pending_requests();
1187                let gate = Arc::new(crate::wire::webrtc::gate::WebRtcGate::new(
1188                    coordinator.clone(),
1189                    pending_requests,
1190                    data_stream_registry,
1191                ));
1192
1193                // Set local_id
1194                gate.set_local_id(actor_id.clone()).await;
1195
1196                tracing::info!(
1197                    "✅ WebRtcGate created with shared pending_requests and DataStreamRegistry"
1198                );
1199
1200                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1201                // 1.7. Set outproc_gate in ContextFactory (completing initialization)
1202                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1203                tracing::info!("🔧 Setting outproc_gate in ContextFactory");
1204                self.context_factory
1205                    .as_mut()
1206                    .expect("ContextFactory must exist")
1207                    .set_outproc_gate(outproc_gate_enum);
1208
1209                tracing::info!(
1210                    "✅ ContextFactory fully initialized (inproc + outproc gates ready)"
1211                );
1212
1213                // Save references (WebRtcGate kept for backward compatibility if needed)
1214                self.webrtc_coordinator = Some(coordinator.clone());
1215                self.webrtc_gate = Some(gate.clone());
1216
1217                tracing::info!("✅ WebRTC infrastructure initialized");
1218
1219                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1220                // 1.7.5. Create shared state for credential management
1221                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1222                // Shared credential state initialized above; reused across tasks
1223
1224                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1225                // 1.8. Spawn heartbeat task (periodic Ping to signaling server)
1226                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1227                {
1228                    let shutdown = self.shutdown_token.clone();
1229                    let client = self.signaling_client.clone();
1230                    let actor_id_for_heartbeat = actor_id.clone();
1231                    let credential_state_for_heartbeat = credential_state.clone();
1232                    let mailbox_for_heartbeat = self.mailbox.clone();
1233
1234                    // Use interval from registration response, default to 30s
1235                    let heartbeat_interval_secs = register_ok.signaling_heartbeat_interval_secs;
1236                    let heartbeat_interval = if heartbeat_interval_secs > 0 {
1237                        Duration::from_secs(heartbeat_interval_secs as u64)
1238                    } else {
1239                        Duration::from_secs(30)
1240                    };
1241
1242                    let heartbeat_handle = tokio::spawn(heartbeat_task(
1243                        shutdown,
1244                        client,
1245                        actor_id_for_heartbeat,
1246                        credential_state_for_heartbeat,
1247                        mailbox_for_heartbeat,
1248                        heartbeat_interval,
1249                    ));
1250
1251                    task_handles.push(heartbeat_handle);
1252                }
1253                tracing::info!(
1254                    "✅ Heartbeat task started (interval: {}s)",
1255                    register_ok.signaling_heartbeat_interval_secs
1256                );
1257
1258                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1259                // 1.8.5. Spawn network event processing loop
1260                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1261                if let (Some(event_rx), Some(result_tx)) = (
1262                    self.network_event_rx.take(),
1263                    self.network_event_result_tx.take(),
1264                ) {
1265                    use crate::lifecycle::network_event::DefaultNetworkEventProcessor;
1266
1267                    // 创建 DefaultNetworkEventProcessor
1268                    let event_processor = Arc::new(DefaultNetworkEventProcessor::new(
1269                        self.signaling_client.clone(),
1270                        self.webrtc_coordinator.clone(),
1271                    ));
1272
1273                    let shutdown = self.shutdown_token.clone();
1274
1275                    let network_event_handle = tokio::spawn(async move {
1276                        Self::network_event_loop(event_rx, result_tx, event_processor, shutdown)
1277                            .await;
1278                    });
1279
1280                    task_handles.push(network_event_handle);
1281                    tracing::info!("✅ Network event loop started");
1282                }
1283
1284                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1285                // 1.9. Spawn dedicated Unregister task (best-effort, with timeout)
1286                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1287                //
1288                // This task:
1289                // - Waits for shutdown_token to be cancelled (e.g., wait_for_ctrl_c_and_shutdown)
1290                // - Then sends UnregisterRequest via signaling client with a timeout
1291                //
1292                // NOTE: we push its JoinHandle into task_handles so it can be aborted
1293                // by ActrRefShared::Drop if needed.
1294                {
1295                    let shutdown = self.shutdown_token.clone();
1296                    let client = self.signaling_client.clone();
1297                    let actor_id_for_unreg = actor_id.clone();
1298                    let credential_state_for_unreg = credential_state.clone();
1299                    let webrtc_coordinator = self.webrtc_coordinator.clone();
1300
1301                    let unregister_handle = tokio::spawn(async move {
1302                        // Wait for shutdown signal
1303                        shutdown.cancelled().await;
1304                        tracing::info!(
1305                            "📡 Shutdown signal received2, sending UnregisterRequest for Actor {:?}",
1306                            actor_id_for_unreg
1307                        );
1308
1309                        // 1. 先关闭所有 WebRTC peer 连接(如果存在)
1310                        if let Some(coord) = webrtc_coordinator {
1311                            if let Err(e) = coord.close_all_peers().await {
1312                                tracing::warn!(
1313                                    "⚠️ Failed to close all WebRTC peers before UnregisterRequest: {}",
1314                                    e
1315                                );
1316                            } else {
1317                                tracing::info!(
1318                                    "✅ All WebRTC peers closed before UnregisterRequest"
1319                                );
1320                            }
1321                        } else {
1322                            tracing::debug!(
1323                                "WebRTC coordinator not found before UnregisterRequest (no WebRTC?)"
1324                            );
1325                        }
1326
1327                        // 2. 再发送 UnregisterRequest,设置一个超时(例如 5 秒)
1328                        let result = tokio::time::timeout(
1329                            std::time::Duration::from_secs(5),
1330                            client.send_unregister_request(
1331                                actor_id_for_unreg.clone(),
1332                                credential_state_for_unreg.credential().await,
1333                                Some("Graceful shutdown".to_string()),
1334                            ),
1335                        )
1336                        .await;
1337                        tracing::info!("UnregisterRequest result: {:?}", result);
1338                        match result {
1339                            Ok(Ok(_)) => {
1340                                tracing::info!(
1341                                    "✅ UnregisterRequest sent to signaling server for Actor {:?}",
1342                                    actor_id_for_unreg
1343                                );
1344                            }
1345                            Ok(Err(e)) => {
1346                                tracing::warn!(
1347                                    "⚠️ Failed to send UnregisterRequest for Actor {:?}: {}",
1348                                    actor_id_for_unreg,
1349                                    e
1350                                );
1351                            }
1352                            Err(_) => {
1353                                tracing::warn!(
1354                                    "⚠️ UnregisterRequest timeout (5s) for Actor {:?}",
1355                                    actor_id_for_unreg
1356                                );
1357                            }
1358                        }
1359                    });
1360
1361                    task_handles.push(unregister_handle);
1362                }
1363            }
1364            Some(register_response::Result::Error(error)) => {
1365                tracing::error!(
1366                    severity = 10,
1367                    error_category = "registration_error",
1368                    error_code = error.code,
1369                    "❌ Registration failed: code={}, message={}",
1370                    error.code,
1371                    error.message
1372                );
1373                return Err(actr_protocol::ProtocolError::TransportError(format!(
1374                    "Registration rejected: {} (code: {})",
1375                    error.message, error.code
1376                )));
1377            }
1378            None => {
1379                tracing::error!(
1380                    severity = 10,
1381                    error_category = "registration_error",
1382                    "❌ Registration response missing result"
1383                );
1384                return Err(actr_protocol::ProtocolError::TransportError(
1385                    "Invalid registration response: missing result".to_string(),
1386                ));
1387            }
1388        }
1389
1390        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1391        // 2. Transport layer initialization (completed via WebRTC infrastructure)
1392        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1393        tracing::info!("✅ Transport layer initialized via WebRTC infrastructure");
1394
1395        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1396        // 3.1 Convert to Arc (before starting background loops)
1397        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1398        // Clone actor_id before moving self into Arc
1399        let actor_id = self
1400            .actor_id
1401            .as_ref()
1402            .ok_or_else(|| {
1403                actr_protocol::ProtocolError::InvalidStateTransition(
1404                    "Actor ID not set - registration must complete before starting node"
1405                        .to_string(),
1406                )
1407            })?
1408            .clone();
1409        let credential_state = self.credential_state.clone().ok_or_else(|| {
1410            actr_protocol::ProtocolError::InvalidStateTransition(
1411                "Credential not set - node must be started before handling messages".to_string(),
1412            )
1413        })?;
1414
1415        let actor_id_for_shell = actor_id.clone();
1416        let shutdown_token = self.shutdown_token.clone();
1417        let node_ref = Arc::new(self);
1418
1419        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1420        // 3.5. Start WebRTC background loops (BEFORE on_start)
1421        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1422        // CRITICAL: Start signaling loop before on_start() to avoid deadlock
1423        // where on_start() tries to send messages but signaling loop isn't running
1424        tracing::info!("🚀 Starting WebRTC background loops");
1425
1426        // Start WebRtcCoordinator signaling loop
1427        if let Some(coordinator) = &node_ref.webrtc_coordinator {
1428            coordinator.clone().start().await.map_err(|e| {
1429                actr_protocol::ProtocolError::TransportError(format!(
1430                    "WebRtcCoordinator start failed: {e}"
1431                ))
1432            })?;
1433            tracing::info!("✅ WebRtcCoordinator signaling loop started");
1434        }
1435
1436        // Start WebRtcGate message receive loop (route to Mailbox)
1437        if let Some(gate) = &node_ref.webrtc_gate {
1438            gate.start_receive_loop(node_ref.mailbox.clone())
1439                .await
1440                .map_err(|e| {
1441                    actr_protocol::ProtocolError::TransportError(format!(
1442                        "WebRtcGate receive loop start failed: {e}"
1443                    ))
1444                })?;
1445            tracing::info!("✅ WebRtcGate → Mailbox routing started");
1446        }
1447
1448        tracing::info!("✅ WebRTC background loops started");
1449
1450        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1451        // 4. Call lifecycle hook on_start (AFTER WebRTC loops are running)
1452        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1453        tracing::info!("🪝 Calling lifecycle hook: on_start");
1454
1455        let ctx = node_ref
1456            .context_factory
1457            .as_ref()
1458            .expect("ContextFactory must be initialized before on_start")
1459            .create(
1460                &actor_id,
1461                None,        // caller_id
1462                "bootstrap", // request_id
1463                &credential_state.credential().await,
1464            );
1465        node_ref.workload.on_start(&ctx).await?;
1466        tracing::info!("✅ Lifecycle hook on_start completed");
1467
1468        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1469        // 4.6. Start Inproc receive loop (Shell → Workload)
1470        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1471        tracing::info!("🔄 Starting Inproc receive loop (Shell → Workload)");
1472        // Start Workload receive loop (Shell → Workload REQUEST)
1473        if let Some(shell_to_workload) = &node_ref.inproc_mgr {
1474            if let Some(workload_to_shell) = &node_ref.workload_to_shell_mgr {
1475                let node = node_ref.clone();
1476                let request_rx_lane = shell_to_workload
1477                    .get_lane(actr_protocol::PayloadType::RpcReliable, None)
1478                    .await
1479                    .map_err(|e| {
1480                        actr_protocol::ProtocolError::TransportError(format!(
1481                            "Failed to get Workload receive lane: {e}"
1482                        ))
1483                    })?;
1484                let response_tx = workload_to_shell.clone();
1485                let shutdown = shutdown_token.clone();
1486
1487                let inproc_handle = tokio::spawn(async move {
1488                    loop {
1489                        tokio::select! {
1490                            _ = shutdown.cancelled() => {
1491                                tracing::info!("📭 Workload receive loop (Shell → Workload) received shutdown signal");
1492                                break;
1493                            }
1494
1495                            envelope_result = request_rx_lane.recv_envelope() => {
1496                                match envelope_result {
1497                                    Ok(envelope) => {
1498                                        let request_id = envelope.request_id.clone();
1499                                        // Extract and set tracing context from envelope
1500                                        #[cfg(feature = "opentelemetry")]
1501                                        let span = {
1502                                                let span = tracing::info_span!("actrNode.lane.receive_rpc", request_id = %request_id);
1503                                                set_parent_from_rpc_envelope(&span, &envelope);
1504                                                span
1505                                            };
1506
1507                                        tracing::debug!("📨 Workload received REQUEST from Shell: request_id={}", request_id);
1508
1509                                        // Shell calls have no caller_id (local process communication)
1510                                        let handle_incoming_fut = node.handle_incoming(envelope.clone(), None);
1511                                        #[cfg(feature = "opentelemetry")]
1512                                        let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
1513                                        match handle_incoming_fut.await {
1514                                            Ok(response_bytes) => {
1515                                                // Send RESPONSE back via workload_to_shell
1516                                                // Keep same route_key (no prefix needed - separate channels!)
1517                                                #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1518                                                let mut response_envelope = RpcEnvelope {
1519                                                    route_key: envelope.route_key.clone(),
1520                                                    payload: Some(response_bytes),
1521                                                    error: None,
1522                                                    traceparent: None,
1523                                                    tracestate: None,
1524                                                    request_id: request_id.clone(),
1525                                                    metadata: Vec::new(),
1526                                                    timeout_ms: 30000,
1527                                                };
1528                                                // Inject tracing context
1529                                                #[cfg(feature = "opentelemetry")]
1530                                                inject_span_context_to_rpc(&span, &mut response_envelope);
1531
1532                                                // Send via Workload → Shell channel
1533                                                let send_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, response_envelope);
1534                                                #[cfg(feature = "opentelemetry")]
1535                                                let send_response_fut = send_response_fut.instrument(span.clone());
1536                                                if let Err(e) = send_response_fut.await {
1537                                                    tracing::error!(
1538                                                        severity = 7,
1539                                                        error_category = "transport_error",
1540                                                        request_id = %request_id,
1541                                                        "❌ Failed to send RESPONSE to Shell: {:?}", e
1542                                                    );
1543                                                }
1544                                            }
1545                                            Err(e) => {
1546                                                tracing::error!(
1547                                                    severity = 6,
1548                                                    error_category = "handler_error",
1549                                                    request_id = %request_id,
1550                                                    route_key = %envelope.route_key,
1551                                                    "❌ Workload message handling failed: {:?}", e
1552                                                );
1553
1554                                                // Send error response (system-level error on envelope)
1555                                                let error_response = actr_protocol::ErrorResponse {
1556                                                    code: protocol_error_to_code(&e),
1557                                                    message: e.to_string(),
1558                                                };
1559
1560                                                #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1561                                                let mut error_envelope = RpcEnvelope {
1562                                                    route_key: envelope.route_key.clone(),
1563                                                    payload: None,
1564                                                    error: Some(error_response),
1565                                                    traceparent: envelope.traceparent.clone(),
1566                                                    tracestate: envelope.tracestate.clone(),
1567                                                    request_id: request_id.clone(),
1568                                                    metadata: Vec::new(),
1569                                                    timeout_ms: 30000,
1570                                                };
1571                                                // Inject tracing context
1572                                                #[cfg(feature = "opentelemetry")]
1573                                                inject_span_context_to_rpc(&span, &mut error_envelope);
1574
1575                                                let send_error_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, error_envelope);
1576                                                #[cfg(feature = "opentelemetry")]
1577                                                let send_error_response_fut = send_error_response_fut.instrument(span);
1578                                                if let Err(e) = send_error_response_fut.await {
1579                                                    tracing::error!(
1580                                                        severity = 7,
1581                                                        error_category = "transport_error",
1582                                                        request_id = %request_id,
1583                                                        "❌ Failed to send ERROR response to Shell: {:?}", e
1584                                                    );
1585                                                }
1586                                            }
1587                                        }
1588                                    }
1589                                    Err(e) => {
1590                                        tracing::error!(
1591                                            severity = 8,
1592                                            error_category = "transport_error",
1593                                            "❌ Failed to receive from Shell → Workload lane: {:?}", e
1594                                        );
1595                                        break;
1596                                    }
1597                                }
1598                            }
1599                        }
1600                    }
1601                    tracing::info!(
1602                        "✅ Workload receive loop (Shell → Workload) terminated gracefully"
1603                    );
1604                });
1605
1606                task_handles.push(inproc_handle);
1607            }
1608        }
1609        tracing::info!("✅ Workload receive loop (Shell → Workload REQUEST) started");
1610
1611        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1612        // 4.7. Start Shell receive loop (Workload → Shell RESPONSE)
1613        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1614        tracing::info!("🔄 Starting Shell receive loop (Workload → Shell RESPONSE)");
1615        if let Some(workload_to_shell) = &node_ref.workload_to_shell_mgr {
1616            if let Some(shell_to_workload) = &node_ref.inproc_mgr {
1617                let response_rx_lane = workload_to_shell
1618                    .get_lane(actr_protocol::PayloadType::RpcReliable, None)
1619                    .await
1620                    .map_err(|e| {
1621                        actr_protocol::ProtocolError::TransportError(format!(
1622                            "Failed to get Shell receive lane: {e}"
1623                        ))
1624                    })?;
1625                let request_mgr = shell_to_workload.clone();
1626                let shutdown = shutdown_token.clone();
1627
1628                let shell_receive_handle = tokio::spawn(async move {
1629                    loop {
1630                        tokio::select! {
1631                            _ = shutdown.cancelled() => {
1632                                tracing::info!("📭 Shell receive loop (Workload → Shell) received shutdown signal");
1633                                break;
1634                            }
1635
1636                            envelope_result = response_rx_lane.recv_envelope() => {
1637                                match envelope_result {
1638                                    Ok(envelope) => {
1639                                        tracing::debug!("📨 Shell received RESPONSE from Workload: request_id={}", envelope.request_id);
1640
1641                                        // Check if response is success or error
1642                                        match (envelope.payload, envelope.error) {
1643                                            (Some(payload), None) => {
1644                                                // Success response
1645                                                if let Err(e) = request_mgr.complete_response(&envelope.request_id, payload).await {
1646                                                    tracing::warn!(
1647                                                        severity = 4,
1648                                                        error_category = "orphan_response",
1649                                                        request_id = %envelope.request_id,
1650                                                        "⚠️  No pending request found for response: {:?}", e
1651                                                    );
1652                                                }
1653                                            }
1654                                            (None, Some(error)) => {
1655                                                // Error response - convert to ProtocolError and complete with error
1656                                                let protocol_err = actr_protocol::ProtocolError::TransportError(
1657                                                    format!("RPC error {}: {}", error.code, error.message)
1658                                                );
1659                                                if let Err(e) = request_mgr.complete_error(&envelope.request_id, protocol_err).await {
1660                                                    tracing::warn!(
1661                                                        severity = 4,
1662                                                        error_category = "orphan_response",
1663                                                        request_id = %envelope.request_id,
1664                                                        "⚠️  No pending request found for error response: {:?}", e
1665                                                    );
1666                                                }
1667                                            }
1668                                            _ => {
1669                                                tracing::error!(
1670                                                    severity = 7,
1671                                                    error_category = "protocol_error",
1672                                                    request_id = %envelope.request_id,
1673                                                    "❌ Invalid RpcEnvelope: both payload and error are present or both absent"
1674                                                );
1675                                            }
1676                                        }
1677                                    }
1678                                    Err(e) => {
1679                                        tracing::error!(
1680                                            severity = 8,
1681                                            error_category = "transport_error",
1682                                            "❌ Failed to receive from Workload → Shell lane: {:?}", e
1683                                        );
1684                                        break;
1685                                    }
1686                                }
1687                            }
1688                        }
1689                    }
1690                    tracing::info!(
1691                        "✅ Shell receive loop (Workload → Shell) terminated gracefully"
1692                    );
1693                });
1694
1695                task_handles.push(shell_receive_handle);
1696            }
1697        }
1698        tracing::info!("✅ Shell receive loop (Workload → Shell RESPONSE) started");
1699
1700        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1701        // 5. Start Mailbox processing loop (State Path)
1702        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1703        tracing::info!("🔄 Starting Mailbox processing loop (State Path)");
1704        {
1705            let node = node_ref.clone();
1706            let mailbox = node_ref.mailbox.clone();
1707            let gate = node_ref.webrtc_gate.clone();
1708            let shutdown = shutdown_token.clone();
1709
1710            let mailbox_handle = tokio::spawn(async move {
1711                loop {
1712                    tokio::select! {
1713                        // Listen for shutdown signal
1714                        _ = shutdown.cancelled() => {
1715                            tracing::info!("📭 Mailbox loop received shutdown signal");
1716                            break;
1717                        }
1718
1719                        // Dequeue messages (by priority)
1720                        result = mailbox.dequeue() => {
1721                            match result {
1722                                Ok(messages) => {
1723                                    if messages.is_empty() {
1724                                        // Queue empty, sleep briefly
1725                                        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1726                                        continue;
1727                                    }
1728
1729                                    tracing::debug!("📬 Mailbox dequeue: {} messages", messages.len());
1730
1731                                    // Process messages one by one
1732                                    for msg_record in messages {
1733                                        // Deserialize RpcEnvelope (Protobuf)
1734                                        match RpcEnvelope::decode(&msg_record.payload[..]) {
1735                                            Ok(envelope) => {
1736                                                let request_id = envelope.request_id.clone();
1737                                                #[cfg(feature = "opentelemetry")]
1738                                                let span = {
1739                                                        let span = tracing::info_span!("actrNode.mailbox.receive_rpc", request_id = %request_id);
1740                                                        set_parent_from_rpc_envelope(&span, &envelope);
1741                                                        span
1742                                                    };
1743                                                tracing::debug!("📦 Processing message: request_id={}", request_id);
1744
1745                                                // Decode caller_id from MessageRecord.from (transport layer)
1746                                                let caller_id_result = ActrId::decode(&msg_record.from[..]);
1747                                                let caller_id_ref = caller_id_result.as_ref().ok();
1748
1749                                                if caller_id_ref.is_none() {
1750                                                    tracing::warn!(
1751                                                        request_id = %request_id,
1752                                                        "⚠️  Failed to decode caller_id from MessageRecord.from"
1753                                                    );
1754                                                }
1755
1756                                                // Call handle_incoming with caller_id from transport layer
1757                                                let handle_incoming_fut = node.handle_incoming(envelope.clone(), caller_id_ref);
1758                                                #[cfg(feature = "opentelemetry")]
1759                                                let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
1760                                                match handle_incoming_fut.await {
1761                                                    Ok(response_bytes) => {
1762                                                        // Send response (reuse request_id)
1763                                                        if let Some(ref gate) = gate {
1764                                                            // Use already decoded caller_id
1765                                                            match caller_id_result {
1766                                                                Ok(caller) => {
1767                                                                    // Construct response RpcEnvelope (reuse request_id!)
1768                                                                    #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1769                                                                    let mut response_envelope = RpcEnvelope {
1770                                                                        request_id,  // Reuse!
1771                                                                        route_key: envelope.route_key.clone(),
1772                                                                        payload: Some(response_bytes),
1773                                                                        error: None,
1774                                                                        traceparent: envelope.traceparent.clone(),
1775                                                                        tracestate: envelope.tracestate.clone(),
1776                                                                        metadata: Vec::new(),  // Response doesn't need extra metadata
1777                                                                        timeout_ms: 30000,
1778                                                                    };
1779                                                                    // Inject tracing context
1780                                                                    #[cfg(feature = "opentelemetry")]
1781                                                                    inject_span_context_to_rpc(&span, &mut response_envelope);
1782
1783                                                                    let send_response_fut = gate.send_response(&caller, response_envelope);
1784                                                                    #[cfg(feature = "opentelemetry")]
1785                                                                    let send_response_fut = send_response_fut.instrument(span);
1786                                                                    if let Err(e) = send_response_fut.await {
1787                                                                        tracing::error!(
1788                                                                            severity = 7,
1789                                                                            error_category = "transport_error",
1790                                                                            request_id = %envelope.request_id,
1791                                                                            "❌ Failed to send response: {:?}", e
1792                                                                        );
1793                                                                    }
1794                                                                }
1795                                                                Err(e) => {
1796                                                                    tracing::error!(
1797                                                                        severity = 8,
1798                                                                        error_category = "protobuf_decode",
1799                                                                        request_id = %envelope.request_id,
1800                                                                        "❌ Failed to decode caller_id: {:?}", e
1801                                                                    );
1802                                                                }
1803                                                            }
1804                                                        }
1805
1806                                                        // ACK message
1807                                                        if let Err(e) = mailbox.ack(msg_record.id).await {
1808                                                            tracing::error!(
1809                                                                severity = 9,
1810                                                                error_category = "mailbox_error",
1811                                                                request_id = %envelope.request_id,
1812                                                                message_id = %msg_record.id,
1813                                                                "❌ Mailbox ACK failed: {:?}", e
1814                                                            );
1815                                                        }
1816                                                    }
1817                                                    Err(e) => {
1818                                                        tracing::error!(
1819                                                            severity = 6,
1820                                                            error_category = "handler_error",
1821                                                            request_id = %envelope.request_id,
1822                                                            route_key = %envelope.route_key,
1823                                                            "❌ handle_incoming failed: {:?}", e
1824                                                        );
1825                                                        // ACK to avoid infinite retries
1826                                                        // Application errors are caller's responsibility
1827                                                        let _ = mailbox.ack(msg_record.id).await;
1828                                                    }
1829                                                }
1830                                            }
1831                                            Err(e) => {
1832                                                // Poison message - cannot decode RpcEnvelope
1833                                                tracing::error!(
1834                                                    severity = 9,
1835                                                    error_category = "protobuf_decode",
1836                                                    message_id = %msg_record.id,
1837                                                    "❌ Poison message: Failed to deserialize RpcEnvelope: {:?}", e
1838                                                );
1839
1840                                                // Write to Dead Letter Queue
1841                                                use actr_mailbox::DlqRecord;
1842                                                use chrono::Utc;
1843                                                use uuid::Uuid;
1844
1845                                                let dlq_record = DlqRecord {
1846                                                    id: Uuid::new_v4(),
1847                                                    original_message_id: Some(msg_record.id.to_string()),
1848                                                    from: Some(msg_record.from.clone()),
1849                                                    to: node.actor_id.as_ref().map(|id| {
1850                                                        let mut buf = Vec::new();
1851                                                        id.encode(&mut buf).unwrap();
1852                                                        buf
1853                                                    }),
1854                                                    raw_bytes: msg_record.payload.clone(),
1855                                                    error_message: format!("Protobuf decode failed: {e}"),
1856                                                    error_category: "protobuf_decode".to_string(),
1857                                                    trace_id: format!("mailbox-{}", msg_record.id),  // Fallback trace_id
1858                                                    request_id: None,
1859                                                    created_at: Utc::now(),
1860                                                    redrive_attempts: 0,
1861                                                    last_redrive_at: None,
1862                                                    context: Some(format!(
1863                                                        r#"{{"source":"mailbox","priority":"{}"}}"#,
1864                                                        match msg_record.priority {
1865                                                            actr_mailbox::MessagePriority::High => "high",
1866                                                            actr_mailbox::MessagePriority::Normal => "normal",
1867                                                        }
1868                                                    )),
1869                                                };
1870
1871                                                if let Err(dlq_err) = node.dlq.enqueue(dlq_record).await {
1872                                                    tracing::error!(
1873                                                        severity = 10,
1874                                                        "❌ CRITICAL: Failed to write poison message to DLQ: {:?}", dlq_err
1875                                                    );
1876                                                } else {
1877                                                    tracing::warn!(
1878                                                        severity = 9,
1879                                                        "☠️ Poison message moved to DLQ: message_id={}", msg_record.id
1880                                                    );
1881                                                }
1882
1883                                                // ACK the poison message to remove from mailbox
1884                                                let _ = mailbox.ack(msg_record.id).await;
1885                                            }
1886                                        }
1887                                    }
1888                                }
1889                                Err(e) => {
1890                                    tracing::error!(
1891                                        severity = 9,
1892                                        error_category = "mailbox_error",
1893                                        "❌ Mailbox dequeue failed: {:?}", e
1894                                    );
1895                                    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1896                                }
1897                            }
1898                        }
1899                    }
1900                }
1901                tracing::info!("✅ Mailbox processing loop terminated gracefully");
1902            });
1903
1904            task_handles.push(mailbox_handle);
1905        }
1906        tracing::info!("✅ Mailbox processing loop started");
1907
1908        tracing::info!("✅ ActrNode started successfully");
1909
1910        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1911        // 6. Create ActrRef for Shell to interact with Workload
1912        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1913        use crate::actr_ref::{ActrRef, ActrRefShared};
1914        use crate::outbound::InprocOutGate;
1915
1916        // Create InprocOutGate from shell_to_workload transport manager
1917        let shell_to_workload = node_ref
1918            .inproc_mgr
1919            .clone()
1920            .expect("inproc_mgr must be initialized");
1921        let inproc_gate = Arc::new(InprocOutGate::new(shell_to_workload));
1922
1923        // Create ActrRefShared
1924        let actr_ref_shared = Arc::new(ActrRefShared {
1925            actor_id: actor_id_for_shell.clone(),
1926            inproc_gate,
1927            shutdown_token: shutdown_token.clone(),
1928            task_handles: tokio::sync::Mutex::new(task_handles),
1929        });
1930
1931        // Create ActrRef
1932        let actr_ref = ActrRef::new(actr_ref_shared, node_ref);
1933
1934        tracing::info!("✅ ActrRef created (Shell → Workload communication handle)");
1935
1936        Ok(actr_ref)
1937    }
1938}
1939
1940#[cfg(test)]
1941mod tests {
1942    use super::*;
1943    use actr_protocol::AIdCredential;
1944    use prost_types::Timestamp;
1945
1946    fn create_test_credential(token_key_id: u32) -> AIdCredential {
1947        AIdCredential {
1948            encrypted_token: vec![1, 2, 3, 4].into(),
1949            token_key_id,
1950        }
1951    }
1952
1953    fn create_test_timestamp(seconds: i64) -> Timestamp {
1954        Timestamp { seconds, nanos: 0 }
1955    }
1956
1957    #[tokio::test]
1958    async fn test_credential_state_initialization() {
1959        let credential = create_test_credential(1);
1960        let expires_at = Some(create_test_timestamp(1000));
1961
1962        let state = CredentialState::new(credential.clone(), expires_at, None);
1963
1964        let retrieved_credential = state.credential().await;
1965        assert_eq!(retrieved_credential.token_key_id, 1);
1966        assert_eq!(retrieved_credential.encrypted_token.as_ref(), &[1, 2, 3, 4]);
1967
1968        let retrieved_expires_at = state.expires_at().await;
1969        assert_eq!(retrieved_expires_at, expires_at);
1970    }
1971
1972    #[tokio::test]
1973    async fn test_credential_state_without_expiration() {
1974        let credential = create_test_credential(2);
1975        let state = CredentialState::new(credential.clone(), None, None);
1976
1977        let retrieved_credential = state.credential().await;
1978        assert_eq!(retrieved_credential.token_key_id, 2);
1979
1980        let retrieved_expires_at = state.expires_at().await;
1981        assert!(retrieved_expires_at.is_none());
1982    }
1983
1984    #[tokio::test]
1985    async fn test_credential_state_update() {
1986        let credential1 = create_test_credential(1);
1987        let expires_at1 = Some(create_test_timestamp(1000));
1988        let state = CredentialState::new(credential1, expires_at1, None);
1989
1990        // Verify initial state
1991        let initial_credential = state.credential().await;
1992        assert_eq!(initial_credential.token_key_id, 1);
1993
1994        // Update credential
1995        let credential2 = create_test_credential(2);
1996        let expires_at2 = Some(create_test_timestamp(2000));
1997        state.update(credential2.clone(), expires_at2, None).await;
1998
1999        // Verify updated state
2000        let updated_credential = state.credential().await;
2001        assert_eq!(updated_credential.token_key_id, 2);
2002        assert_eq!(
2003            updated_credential.encrypted_token,
2004            credential2.encrypted_token
2005        );
2006
2007        let updated_expires_at = state.expires_at().await;
2008        assert_eq!(updated_expires_at, Some(create_test_timestamp(2000)));
2009    }
2010
2011    #[tokio::test]
2012    async fn test_credential_state_concurrent_access() {
2013        let credential = create_test_credential(1);
2014        let expires_at = Some(create_test_timestamp(1000));
2015        let state = CredentialState::new(credential, expires_at, None);
2016
2017        // Spawn multiple tasks that concurrently access the credential state
2018        let mut handles = vec![];
2019        for i in 0..10 {
2020            let state_clone = state.clone();
2021            let handle = tokio::spawn(async move {
2022                let cred = state_clone.credential().await;
2023                assert_eq!(cred.token_key_id, 1);
2024                i
2025            });
2026            handles.push(handle);
2027        }
2028
2029        // Wait for all tasks to complete
2030        for handle in handles {
2031            let result = handle.await.unwrap();
2032            assert!(result < 10);
2033        }
2034    }
2035
2036    #[tokio::test]
2037    async fn test_credential_state_update_concurrent() {
2038        let credential1 = create_test_credential(1);
2039        let state = CredentialState::new(credential1, None, None);
2040
2041        // Spawn multiple update tasks
2042        let mut handles = vec![];
2043        for i in 2..12 {
2044            let state_clone = state.clone();
2045            let credential = create_test_credential(i);
2046            let handle = tokio::spawn(async move {
2047                state_clone.update(credential, None, None).await;
2048            });
2049            handles.push(handle);
2050        }
2051
2052        // Wait for all updates to complete
2053        for handle in handles {
2054            handle.await.unwrap();
2055        }
2056
2057        // Verify final state (should be the last update)
2058        let final_credential = state.credential().await;
2059        // The exact value depends on which update finished last, but it should be valid
2060        assert!(final_credential.token_key_id >= 2 && final_credential.token_key_id <= 11);
2061    }
2062}