actr_runtime/lifecycle/actr_node.rs
1//! ActrNode - ActrSystem + Workload (1:1 composition)
2
3use crate::context_factory::ContextFactory;
4use crate::lifecycle::compat_lock::{CompatLockManager, CompatibilityCheck};
5use crate::transport::InprocTransportManager;
6#[cfg(feature = "opentelemetry")]
7use crate::wire::webrtc::trace::{inject_span_context_to_rpc, set_parent_from_rpc_envelope};
8use actr_framework::{Bytes, Workload};
9use actr_mailbox::{DeadLetterQueue, Mailbox};
10use actr_protocol::prost::Message as ProstMessage;
11use actr_protocol::{
12 AIdCredential, ActorResult, ActrId, ActrType, CandidateCompatibilityInfo, PayloadType,
13 RegisterRequest, RouteCandidatesRequest, RpcEnvelope, register_response,
14 route_candidates_request,
15};
16use futures_util::FutureExt;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::RwLock;
20use tokio_util::sync::CancellationToken;
21#[cfg(feature = "opentelemetry")]
22use tracing::Instrument as _;
23// Use types from sub-crates
24use crate::wire::webrtc::SignalingClient;
25// Use extension traits from actr-protocol
26use actr_protocol::{ActrIdExt, ActrTypeExt};
27// Use heartbeat functions
28use crate::lifecycle::heartbeat::heartbeat_task;
29
30// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
31// Service Discovery Result
32// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
33
34/// Result of a service discovery request with compatibility information.
35///
36/// This struct is returned by `discover_route_candidates` and provides
37/// detailed information about the compatibility status when fingerprint-based
38/// discovery is used.
39#[derive(Debug, Clone)]
40pub struct DiscoveryResult {
41 /// Ordered list of compatible candidates (best match first)
42 pub candidates: Vec<ActrId>,
43 /// True if at least one candidate has an exact fingerprint match
44 pub has_exact_match: bool,
45 /// True if system is in sub-healthy state (compatible but not exact match)
46 pub is_sub_healthy: bool,
47 /// Detailed compatibility info for each candidate (when fingerprint was provided)
48 pub compatibility_info: Vec<CandidateCompatibilityInfo>,
49}
50
51// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
52// Constants
53// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
54
55/// ActrNode - ActrSystem + Workload (1:1 composition)
56///
57/// # Generic Parameters
58/// - `W`: Workload type
59///
60/// # MessageDispatcher Association
61/// - Statically associated via W::Dispatcher
62/// - Does not store Dispatcher instance (not even ZST needed)
63/// - Dispatch calls entirely through type system
64pub struct ActrNode<W: Workload> {
65 /// Runtime configuration
66 pub(crate) config: actr_config::Config,
67
68 /// Workload instance (the only business logic)
69 pub(crate) workload: Arc<W>,
70
71 /// SQLite persistent mailbox
72 pub(crate) mailbox: Arc<dyn Mailbox>,
73
74 /// Dead Letter Queue for poison messages
75 pub(crate) dlq: Arc<dyn DeadLetterQueue>,
76
77 /// Context factory (created in start() after obtaining ActorId)
78 pub(crate) context_factory: Option<ContextFactory>,
79
80 /// Signaling client
81 pub(crate) signaling_client: Arc<dyn SignalingClient>,
82
83 /// Actor ID (obtained after startup)
84 pub(crate) actor_id: Option<ActrId>,
85
86 /// Actor Credential (obtained after startup, used for subsequent authentication messages)
87 pub(crate) credential_state: Option<CredentialState>,
88
89 /// Pre-shared key for TURN authentication (obtained from registration)
90 pub(crate) psk: Option<Bytes>,
91
92 /// WebRTC coordinator (created after startup)
93 pub(crate) webrtc_coordinator: Option<Arc<crate::wire::webrtc::coordinator::WebRtcCoordinator>>,
94
95 /// WebRTC Gate (created after startup)
96 pub(crate) webrtc_gate: Option<Arc<crate::wire::webrtc::gate::WebRtcGate>>,
97
98 /// Shell → Workload Transport Manager
99 ///
100 /// Workload receives REQUEST from Shell (zero serialization, direct RpcEnvelope passing)
101 pub(crate) inproc_mgr: Option<Arc<InprocTransportManager>>,
102
103 /// Workload → Shell Transport Manager
104 ///
105 /// Workload sends RESPONSE to Shell (separate pending_requests from Shell's)
106 pub(crate) workload_to_shell_mgr: Option<Arc<InprocTransportManager>>,
107
108 /// Shutdown token for graceful shutdown
109 pub(crate) shutdown_token: CancellationToken,
110
111 /// Actr.lock.toml content (loaded at startup for fingerprint lookups)
112 pub(crate) actr_lock: Option<actr_config::lock::LockFile>,
113 /// Network event receiver (from NetworkEventHandle)
114 pub(crate) network_event_rx:
115 Option<tokio::sync::mpsc::Receiver<crate::lifecycle::network_event::NetworkEvent>>,
116
117 /// Network event result sender (to NetworkEventHandle)
118 pub(crate) network_event_result_tx:
119 Option<tokio::sync::mpsc::Sender<crate::lifecycle::network_event::NetworkEventResult>>,
120}
121
122/// Credential state for shared access between tasks
123#[derive(Clone)]
124pub struct CredentialState {
125 inner: Arc<RwLock<CredentialStateInner>>,
126}
127
128#[derive(Clone)]
129struct CredentialStateInner {
130 credential: AIdCredential,
131 expires_at: Option<prost_types::Timestamp>,
132 /// This is updated together with credential when credential is refreshed
133 psk: Option<Bytes>,
134}
135
136impl CredentialState {
137 /// Create a new CredentialState with PSK
138 pub(crate) fn new(
139 credential: AIdCredential,
140 expires_at: Option<prost_types::Timestamp>,
141 psk: Option<Bytes>,
142 ) -> Self {
143 Self {
144 inner: Arc::new(RwLock::new(CredentialStateInner {
145 credential,
146 expires_at,
147 psk,
148 })),
149 }
150 }
151
152 pub async fn credential(&self) -> AIdCredential {
153 self.inner.read().await.credential.clone()
154 }
155
156 pub async fn expires_at(&self) -> Option<prost_types::Timestamp> {
157 self.inner.read().await.expires_at
158 }
159
160 /// Get the PSK for TURN authentication
161 pub async fn psk(&self) -> Option<Bytes> {
162 self.inner.read().await.psk.clone()
163 }
164
165 /// Update credential along with PSK
166 /// This should be called when credential is refreshed and a new PSK is provided
167 pub(crate) async fn update(
168 &self,
169 credential: AIdCredential,
170 expires_at: Option<prost_types::Timestamp>,
171 psk: Option<Bytes>,
172 ) {
173 let mut guard = self.inner.write().await;
174 guard.credential = credential;
175 guard.expires_at = expires_at;
176 if psk.is_some() {
177 guard.psk = psk;
178 }
179 }
180}
181
182/// Map ProtocolError to error code for ErrorResponse
183fn protocol_error_to_code(err: &actr_protocol::ProtocolError) -> u32 {
184 use actr_protocol::ProtocolError;
185 match err {
186 ProtocolError::Actr(_) => 400, // Bad Request - identity/decode error
187 ProtocolError::Uri(_) => 400, // Bad Request - URI parsing error
188 ProtocolError::Name(_) => 400, // Bad Request - invalid name
189 ProtocolError::SerializationError(_) => 500, // Internal Server Error
190 ProtocolError::DeserializationError(_) => 400, // Bad Request - invalid payload
191 ProtocolError::DecodeError(_) => 400, // Bad Request - decode failure
192 ProtocolError::EncodeError(_) => 500, // Internal Server Error
193 ProtocolError::UnknownRoute(_) => 404, // Not Found - route not found
194 ProtocolError::TransportError(_) => 503, // Service Unavailable
195 ProtocolError::Timeout => 504, // Gateway Timeout
196 ProtocolError::TargetNotFound(_) => 404, // Not Found
197 ProtocolError::TargetUnavailable(_) => 503, // Service Unavailable
198 ProtocolError::InvalidStateTransition(_) => 500, // Internal Server Error
199 }
200}
201
202/// Check ACL permission for incoming request
203///
204/// # Arguments
205/// - `caller_id`: The ActrId of the caller (None for local calls)
206/// - `target_id`: The ActrId of the target (self)
207/// - `acl`: ACL rules from configuration
208///
209/// # Returns
210/// - `Ok(true)`: Permission granted
211/// - `Ok(false)`: Permission denied
212/// - `Err`: ACL check failed (treat as deny)
213///
214/// # ACL Evaluation Logic
215/// 1. If no caller_id (local call), always allow
216/// 2. If no ACL configured, allow by default (permissive mode for backward compatibility)
217/// 3. If ACL configured but rules are empty, deny all (secure by default)
218/// 4. Iterate through ACL rules in order (first match wins)
219/// - Check if caller matches any principal in the rule
220/// - If matched, return the rule's permission (ALLOW/DENY)
221/// 5. If no rule matches, deny by default (secure by default)
222fn check_acl_permission(
223 caller_id: Option<&ActrId>,
224 target_id: &ActrId,
225 acl: Option<&actr_protocol::Acl>,
226) -> Result<bool, String> {
227 // 1. Local calls (no caller_id) are always allowed
228 if caller_id.is_none() {
229 tracing::trace!("ACL: Local call, allowing");
230 return Ok(true);
231 }
232
233 let caller = caller_id.unwrap();
234
235 // 2. No ACL configured - allow by default
236 let acl_rules = match acl {
237 Some(acl) => acl,
238 None => {
239 tracing::trace!(
240 "ACL: No ACL configured, allowing {} -> {}",
241 caller.to_string_repr(),
242 target_id.to_string_repr()
243 );
244 return Ok(true);
245 }
246 };
247
248 // 3. If ACL is configured but has no rules, deny all (secure by default)
249 if acl_rules.rules.is_empty() {
250 tracing::warn!(
251 "ACL: ACL configured but no rules defined, denying {} -> {} (default deny)",
252 caller.to_string_repr(),
253 target_id.to_string_repr()
254 );
255 return Ok(false);
256 }
257
258 // 4. Iterate through ACL rules (first match wins)
259 for (rule_idx, rule) in acl_rules.rules.iter().enumerate() {
260 // Check if caller matches any principal in this rule
261 let mut matched = false;
262
263 // If no principals specified, skip this rule (empty allow list = no match)
264 if rule.principals.is_empty() {
265 tracing::trace!(
266 "ACL: Rule {} has empty principals list, skipping (no match)",
267 rule_idx
268 );
269 continue;
270 }
271
272 // Check each principal
273 for principal in &rule.principals {
274 if matches_principal(caller, principal) {
275 matched = true;
276 tracing::trace!(
277 "ACL: Rule {} matched principal: caller={}, principal_realm={:?}, principal_type={:?}",
278 rule_idx,
279 caller.to_string_repr(),
280 principal.realm.as_ref().map(|r| r.realm_id),
281 principal.actr_type.as_ref().map(|t| &t.name)
282 );
283 break;
284 }
285 }
286
287 // If matched, return the permission
288 if matched {
289 let permission = rule.permission;
290 let is_allow = permission == actr_protocol::acl_rule::Permission::Allow as i32;
291
292 tracing::debug!(
293 "ACL: Rule {} matched, permission={} for {} -> {}",
294 rule_idx,
295 if is_allow { "ALLOW" } else { "DENY" },
296 caller.to_string_repr(),
297 target_id.to_string_repr()
298 );
299
300 return Ok(is_allow);
301 }
302 }
303
304 // 5. No rule matched - deny by default (secure by default)
305 tracing::warn!(
306 "ACL: No matching rule found, denying {} -> {} (default deny)",
307 caller.to_string_repr(),
308 target_id.to_string_repr()
309 );
310 Ok(false)
311}
312
313/// Check if a caller matches a principal
314///
315/// A principal matches if:
316/// - If principal.realm is specified, it must match caller.realm
317/// - If principal.actr_type is specified, it must match caller.type
318/// - If both are None, principal matches all (should not happen in practice)
319fn matches_principal(caller: &ActrId, principal: &actr_protocol::acl_rule::Principal) -> bool {
320 // Check realm match (if specified)
321 if let Some(ref principal_realm) = principal.realm
322 && caller.realm.realm_id != principal_realm.realm_id
323 {
324 return false;
325 }
326
327 // Check type match (if specified)
328 if let Some(ref principal_type) = principal.actr_type
329 && (caller.r#type.manufacturer != principal_type.manufacturer
330 || caller.r#type.name != principal_type.name)
331 {
332 return false;
333 }
334
335 // If we reach here, all specified fields matched
336 true
337}
338
339impl<W: Workload> ActrNode<W> {
340 /// Get Inproc Transport Manager
341 ///
342 /// # Returns
343 /// - `Some(Arc<InprocTransportManager>)`: Initialized manager
344 /// - `None`: Not yet started (need to call start() first)
345 ///
346 /// # Use Cases
347 /// - Workload internals need to communicate with Shell
348 /// - Create custom LatencyFirst/MediaTrack channels
349 pub fn inproc_mgr(&self) -> Option<Arc<InprocTransportManager>> {
350 self.inproc_mgr.clone()
351 }
352
353 /// Get ActorId (if registration has completed)
354 pub fn actor_id(&self) -> Option<&ActrId> {
355 self.actor_id.as_ref()
356 }
357
358 /// Get credential state (if registration has completed)
359 pub fn credential_state(&self) -> Option<CredentialState> {
360 self.credential_state.clone()
361 }
362
363 /// Get signaling client (for manual control such as UnregisterRequest)
364 pub fn signaling_client(&self) -> Arc<dyn SignalingClient> {
365 self.signaling_client.clone()
366 }
367
368 /// Get shutdown token for this node
369 pub fn shutdown_token(&self) -> CancellationToken {
370 self.shutdown_token.clone()
371 }
372
373 /// Discover remote actors of the specified type via signaling server.
374 ///
375 /// This method implements the full runtime compatibility negotiation workflow
376 /// as specified in the documentation:
377 ///
378 /// # Compatibility Negotiation Flow
379 ///
380 /// 1. **Step 0: Fast Path (compat.lock.toml)**
381 /// - Check if `compat.lock.toml` has a cached negotiation for this service
382 /// - If found and not expired, use the cached `resolved_fingerprint` directly
383 ///
384 /// 2. **Step 1: Ideal Path (Exact Match)**
385 /// - Read the expected fingerprint from `Actr.lock.toml`
386 /// - Request exact match from signaling server
387 /// - If found → connection success, system is HEALTHY
388 ///
389 /// 3. **Step 2: Trigger Negotiation (Match Failure)**
390 /// - If no exact match, enter compatibility negotiation mode
391 ///
392 /// 4. **Step 3: Compatibility Check (Server-side)**
393 /// - Server performs backward compatibility analysis using proto-sign
394 ///
395 /// 5. **Step 4: Decision**
396 /// - **Success**: Found compatible version → SUB-HEALTHY state
397 /// - Update `compat.lock.toml` with negotiation result
398 /// - Log warning: "SYSTEM SUB-HEALTHY"
399 /// - **Failure**: No compatible version → FAILED state
400 /// - Log error: "SYSTEM FAILED"
401 ///
402 /// # Arguments
403 /// - `target_type`: The ActrType of the target service to discover
404 /// - `candidate_count`: Maximum number of candidates to return
405 ///
406 /// # Returns
407 /// A `DiscoveryResult` containing candidates and compatibility information
408 #[cfg_attr(feature = "opentelemetry", tracing::instrument(skip_all))]
409 pub async fn discover_route_candidates(
410 &self,
411 target_type: &ActrType,
412 candidate_count: u32,
413 ) -> ActorResult<DiscoveryResult> {
414 // Check if node is started (has actor_id and credential)
415 let actor_id = self.actor_id.as_ref().ok_or_else(|| {
416 actr_protocol::ProtocolError::InvalidStateTransition(
417 "Node is not started. Call start() first.".to_string(),
418 )
419 })?;
420
421 // Check if the signaling client is connected
422 if !self.signaling_client.is_connected() {
423 return Err(actr_protocol::ProtocolError::TransportError(
424 "Signaling client is not connected.".to_string(),
425 ));
426 }
427
428 let service_name = format!("{}/{}", target_type.manufacturer, target_type.name);
429
430 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
431 // Step 0: Fast Path - Check compat.lock.toml for cached negotiation
432 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
433 let mut compat_lock_manager = CompatLockManager::new(self.config.config_dir.clone());
434 if let Ok(Some(compat_lock)) = compat_lock_manager.load().await {
435 if let Some(cached_entry) = compat_lock.find_valid_entry(&service_name) {
436 tracing::info!(
437 "⚡ Fast path: Using cached negotiation for '{}' (resolved: {})",
438 service_name,
439 &cached_entry.resolved_fingerprint
440 [..20.min(cached_entry.resolved_fingerprint.len())]
441 );
442
443 // Use the cached resolved_fingerprint to find candidates
444 let result = self
445 .send_discovery_request(
446 actor_id,
447 target_type,
448 candidate_count,
449 cached_entry.resolved_fingerprint.clone(),
450 )
451 .await?;
452
453 if !result.candidates.is_empty() {
454 tracing::info!(
455 "📊 服务发现结果 [{}]: {} 个候选 (快速路径, sub_healthy=true)",
456 service_name,
457 result.candidates.len()
458 );
459 return Ok(DiscoveryResult {
460 candidates: result.candidates,
461 has_exact_match: false, // Cached negotiation means not exact
462 is_sub_healthy: true, // Using compat.lock means sub-healthy
463 compatibility_info: result.compatibility_info,
464 });
465 }
466 // If fast path fails, fall through to normal discovery
467 tracing::warn!(
468 "⚠️ Fast path failed for '{}', falling back to normal discovery",
469 service_name
470 );
471 }
472 }
473
474 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
475 // Step 1: Get fingerprint from Actr.lock.toml (REQUIRED)
476 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
477 let client_fingerprint = self.get_dependency_fingerprint(target_type).ok_or_else(|| {
478 tracing::error!(
479 severity = 10,
480 error_category = "dependency_missing",
481 "❌ DEPENDENCY NOT FOUND: Service '{}' is not declared in Actr.lock.toml.\n\
482 Please run 'actr install' to generate the lock file with all dependencies.",
483 service_name
484 );
485 actr_protocol::ProtocolError::Actr(actr_protocol::ActrError::DependencyNotFound {
486 service_name: service_name.clone(),
487 message: format!(
488 "Dependency '{}' not found in Actr.lock.toml. Run 'actr install' to resolve dependencies.",
489 service_name
490 ),
491 })
492 })?;
493
494 tracing::debug!(
495 "📋 Found dependency fingerprint for '{}': {}",
496 service_name,
497 &client_fingerprint[..20.min(client_fingerprint.len())]
498 );
499
500 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
501 // Step 2: Send discovery request to signaling server
502 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
503 let result = self
504 .send_discovery_request(
505 actor_id,
506 target_type,
507 candidate_count,
508 client_fingerprint.clone(),
509 )
510 .await?;
511
512 let has_exact_match = result.has_exact_match;
513 let is_sub_healthy = result.is_sub_healthy;
514
515 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
516 // Step 3 & 4: Handle negotiation result
517 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
518 self.handle_negotiation_result(
519 target_type,
520 &client_fingerprint,
521 &result.compatibility_info,
522 has_exact_match,
523 is_sub_healthy,
524 )
525 .await;
526
527 // Log result
528 tracing::info!(
529 "📊 服务发现结果 [{}]: {} 个候选, exact_match={}, sub_healthy={}",
530 service_name,
531 result.candidates.len(),
532 has_exact_match,
533 is_sub_healthy
534 );
535
536 Ok(DiscoveryResult {
537 candidates: result.candidates,
538 has_exact_match,
539 is_sub_healthy,
540 compatibility_info: result.compatibility_info,
541 })
542 }
543
544 /// Get dependency fingerprint from Actr.lock.toml
545 fn get_dependency_fingerprint(&self, target_type: &ActrType) -> Option<String> {
546 let actr_lock = self.actr_lock.as_ref()?;
547
548 // Try different name formats to find the dependency
549 let service_name = format!("{}/{}", target_type.manufacturer, target_type.name);
550 let actr_type_name = format!("{}+{}", target_type.manufacturer, target_type.name);
551
552 // First try by service name
553 if let Some(dep) = actr_lock.get_dependency(&service_name) {
554 return Some(dep.fingerprint.clone());
555 }
556
557 // Try by actr_type format
558 if let Some(dep) = actr_lock.get_dependency(&actr_type_name) {
559 return Some(dep.fingerprint.clone());
560 }
561
562 // Try by just the name part
563 if let Some(dep) = actr_lock.get_dependency(&target_type.name) {
564 return Some(dep.fingerprint.clone());
565 }
566
567 // Search through all dependencies by actr_type field
568 for dep in &actr_lock.dependencies {
569 if dep.actr_type == actr_type_name || dep.actr_type == target_type.name {
570 return Some(dep.fingerprint.clone());
571 }
572 }
573
574 None
575 }
576
577 /// Internal: Send discovery request to signaling server
578 async fn send_discovery_request(
579 &self,
580 actor_id: &ActrId,
581 target_type: &ActrType,
582 candidate_count: u32,
583 client_fingerprint: String,
584 ) -> ActorResult<DiscoveryResult> {
585 let client = self.signaling_client.as_ref();
586
587 let criteria = route_candidates_request::NodeSelectionCriteria {
588 candidate_count,
589 ranking_factors: Vec::new(),
590 minimal_dependency_requirement: None,
591 minimal_health_requirement: None,
592 };
593
594 let route_request = RouteCandidatesRequest {
595 target_type: target_type.clone(),
596 criteria: Some(criteria),
597 client_location: None,
598 client_fingerprint,
599 };
600
601 let credential_state = self.credential_state.clone().ok_or_else(|| {
602 actr_protocol::ProtocolError::InvalidStateTransition(
603 "Node is not started. Call start() first.".to_string(),
604 )
605 })?;
606
607 let route_response = client
608 .send_route_candidates_request(
609 actor_id.clone(),
610 credential_state.credential().await,
611 route_request,
612 )
613 .await
614 .map_err(|e| {
615 actr_protocol::ProtocolError::TransportError(format!(
616 "Route candidates request failed: {e}"
617 ))
618 })?;
619
620 match route_response.result {
621 Some(actr_protocol::route_candidates_response::Result::Success(success)) => {
622 Ok(DiscoveryResult {
623 candidates: success.candidates,
624 has_exact_match: success.has_exact_match.unwrap_or(false),
625 is_sub_healthy: success.is_sub_healthy.unwrap_or(false),
626 compatibility_info: success.compatibility_info,
627 })
628 }
629 Some(actr_protocol::route_candidates_response::Result::Error(err)) => {
630 Err(actr_protocol::ProtocolError::TransportError(format!(
631 "Route candidates error {}: {}",
632 err.code, err.message
633 )))
634 }
635 None => Err(actr_protocol::ProtocolError::TransportError(
636 "Invalid route candidates response: missing result".to_string(),
637 )),
638 }
639 }
640
641 /// Internal: Handle negotiation result - log warnings and update compat.lock.toml
642 async fn handle_negotiation_result(
643 &self,
644 target_type: &ActrType,
645 client_fingerprint: &str,
646 compatibility_info: &[CandidateCompatibilityInfo],
647 has_exact_match: bool,
648 is_sub_healthy: bool,
649 ) {
650 let service_name = format!("{}/{}", target_type.manufacturer, target_type.name);
651
652 // Log detailed compatibility info
653 tracing::info!(
654 "📊 服务发现结果 [{}]: {} 个候选, exact_match={}, sub_healthy={}",
655 service_name,
656 compatibility_info.len(),
657 has_exact_match,
658 is_sub_healthy
659 );
660
661 for info in compatibility_info {
662 let status = if info.is_exact_match.unwrap_or(false) {
663 "✅ 精确匹配"
664 } else if let Some(ref result) = info.analysis_result {
665 match result.level() {
666 actr_protocol::CompatibilityLevel::FullyCompatible => "✅ 完全兼容",
667 actr_protocol::CompatibilityLevel::BackwardCompatible => "⚠️ 向后兼容",
668 actr_protocol::CompatibilityLevel::BreakingChanges => "❌ 破坏性变更",
669 }
670 } else {
671 "❓ 未知"
672 };
673
674 tracing::debug!(
675 " - 候选 {}: {} (指纹: {})",
676 info.candidate_id.serial_number,
677 status,
678 &info.candidate_fingerprint[..20.min(info.candidate_fingerprint.len())]
679 );
680 }
681
682 // Handle sub-healthy state
683 if is_sub_healthy && !has_exact_match {
684 // Find the first compatible (non-exact) match for logging
685 if let Some(resolved) = compatibility_info.first() {
686 tracing::warn!(
687 "🟡 SYSTEM SUB-HEALTHY: Service '{}' using compatible fingerprint ({}) \
688 instead of exact match ({}). Run 'actr install --force-update' to restore health.",
689 service_name,
690 &resolved.candidate_fingerprint[..20.min(resolved.candidate_fingerprint.len())],
691 &client_fingerprint[..20.min(client_fingerprint.len())]
692 );
693
694 // Update compat.lock.toml
695 let mut manager = CompatLockManager::new(self.config.config_dir.clone());
696 if let Err(e) = manager
697 .record_negotiation(
698 &service_name,
699 client_fingerprint,
700 &resolved.candidate_fingerprint,
701 false, // not exact match
702 CompatibilityCheck::BackwardCompatible,
703 )
704 .await
705 {
706 tracing::warn!("Failed to update compat.lock.toml: {}", e);
707 }
708 }
709 } else if has_exact_match {
710 // Exact match found - try to clean up compat.lock.toml entry if exists
711 let mut manager = CompatLockManager::new(self.config.config_dir.clone());
712 if let Ok(Some(_)) = manager.load().await {
713 if let Some(resolved) = compatibility_info.first() {
714 if let Err(e) = manager
715 .record_negotiation(
716 &service_name,
717 client_fingerprint,
718 &resolved.candidate_fingerprint,
719 true, // exact match
720 CompatibilityCheck::ExactMatch,
721 )
722 .await
723 {
724 tracing::debug!("Could not update compat.lock.toml: {}", e);
725 }
726 }
727 }
728 }
729 }
730 /// 网络事件处理循环(后台任务)
731 ///
732 /// # 职责
733 /// - 从 Channel 接收网络事件
734 /// - 委托给 NetworkEventProcessor 处理
735 /// - 记录处理时间并发送结果
736 async fn network_event_loop(
737 mut event_rx: tokio::sync::mpsc::Receiver<crate::lifecycle::network_event::NetworkEvent>,
738 result_tx: tokio::sync::mpsc::Sender<crate::lifecycle::network_event::NetworkEventResult>,
739 event_processor: Arc<dyn crate::lifecycle::network_event::NetworkEventProcessor>,
740 shutdown_token: CancellationToken,
741 ) {
742 use crate::lifecycle::network_event::{NetworkEvent, NetworkEventResult};
743
744 tracing::info!("🔄 Network event loop started");
745
746 loop {
747 tokio::select! {
748 // 接收网络事件
749 Some(event) = event_rx.recv() => {
750 let start = std::time::Instant::now();
751
752 let result = match &event {
753 NetworkEvent::Available => {
754 event_processor.process_network_available().await
755 }
756 NetworkEvent::Lost => {
757 event_processor.process_network_lost().await
758 }
759 NetworkEvent::TypeChanged { is_wifi, is_cellular } => {
760 event_processor.process_network_type_changed(*is_wifi, *is_cellular).await
761 }
762 };
763
764 let duration_ms = start.elapsed().as_millis() as u64;
765
766 // 构造处理结果
767 let event_result = match result {
768 Ok(_) => NetworkEventResult::success(event.clone(), duration_ms),
769 Err(e) => NetworkEventResult::failure(event.clone(), e, duration_ms),
770 };
771
772 // 发送结果(忽略发送失败,避免阻塞)
773 if let Err(e) = result_tx.send(event_result).await {
774 tracing::warn!("Failed to send event result: {}", e);
775 }
776 }
777
778 // 监听关闭信号
779 _ = shutdown_token.cancelled() => {
780 tracing::info!("🛑 Network event loop shutting down");
781 break;
782 }
783 }
784 }
785 }
786
787 /// Handle incoming message envelope
788 ///
789 /// # Performance Analysis
790 /// 1. create_context: ~10ns
791 /// 2. W::Dispatcher::dispatch: ~5-10ns (static match, can be inlined)
792 /// 3. User business logic: variable
793 ///
794 /// Framework overhead: ~15-20ns (compared to 50-100ns in traditional approaches)
795 ///
796 /// # Zero-cost Abstraction
797 /// - Compiler can inline entire call chain
798 /// - Match branches can be directly expanded
799 /// - Final generated code approaches hand-written match expression
800 ///
801 /// # Parameters
802 /// - `envelope`: The RPC envelope containing the message
803 /// - `caller_id`: The ActrId of the caller (from transport layer, None for local Shell calls)
804 ///
805 /// # caller_id Design
806 ///
807 /// **Why not in RpcEnvelope?**
808 /// - Transport layer (WebRTC/Mailbox) already knows the sender
809 /// - All connections are direct P2P (no intermediaries)
810 /// - Storing in envelope would be redundant duplication
811 ///
812 /// **How it works:**
813 /// - WebRTC/Mailbox stores sender in `MessageRecord.from` (Protobuf bytes)
814 /// - Only decoded when creating Context (once per message)
815 /// - Shell calls pass `None` (local process, no remote caller)
816 /// - Remote calls decode from `MessageRecord.from`
817 ///
818 /// **trace_id vs request_id:**
819 /// - `trace_id`: Distributed tracing across entire call chain (A → B → C)
820 /// - `request_id`: Unique identifier for each request-response pair
821 /// - Both kept for flexibility in complex scenarios
822 /// - Single-hop calls: effectively identical
823 /// - Multi-hop calls: trace_id spans all hops, request_id per hop
824 #[cfg_attr(
825 feature = "opentelemetry",
826 tracing::instrument(skip_all, name = "ActrNode.handle_incoming")
827 )]
828 pub async fn handle_incoming(
829 &self,
830 envelope: RpcEnvelope,
831 caller_id: Option<&ActrId>,
832 ) -> ActorResult<Bytes> {
833 use actr_framework::MessageDispatcher;
834
835 // Log received message
836 if let Some(caller) = caller_id {
837 tracing::debug!(
838 "📨 Handling incoming message: route_key={}, caller={}, request_id={}",
839 envelope.route_key,
840 caller.to_string_repr(),
841 envelope.request_id
842 );
843 } else {
844 tracing::debug!(
845 "📨 Handling incoming message: route_key={}, request_id={}",
846 envelope.route_key,
847 envelope.request_id
848 );
849 }
850
851 // 0. Get actor_id early for ACL check
852 let actor_id = self.actor_id.as_ref().ok_or_else(|| {
853 actr_protocol::ProtocolError::InvalidStateTransition(
854 "Actor ID not set - node must be started before handling messages".to_string(),
855 )
856 })?;
857
858 // 0.1. ACL Permission Check (before processing message)
859 let acl_allowed = check_acl_permission(caller_id, actor_id, self.config.acl.as_ref())
860 .map_err(|err_msg| {
861 actr_protocol::ProtocolError::TransportError(format!(
862 "ACL check failed: {}",
863 err_msg
864 ))
865 })?;
866
867 if !acl_allowed {
868 tracing::warn!(
869 severity = 5,
870 error_category = "acl_denied",
871 request_id = %envelope.request_id,
872 route_key = %envelope.route_key,
873 caller = %caller_id.map(|c| c.to_string_repr()).unwrap_or_else(|| "<none>".to_string()),
874 "🚫 ACL: Permission denied"
875 );
876
877 return Err(actr_protocol::ProtocolError::Actr(
878 actr_protocol::ActrError::PermissionDenied {
879 message: format!(
880 "ACL denied: {} is not allowed to call {}",
881 caller_id
882 .map(|c| c.to_string_repr())
883 .unwrap_or_else(|| "<unknown>".to_string()),
884 actor_id.to_string_repr()
885 ),
886 },
887 ));
888 }
889
890 // 1. Create Context with caller_id from transport layer
891 let credential_state = self.credential_state.clone().ok_or_else(|| {
892 actr_protocol::ProtocolError::InvalidStateTransition(
893 "Credential not set - node must be started before handling messages".to_string(),
894 )
895 })?;
896 let ctx = self
897 .context_factory
898 .as_ref()
899 .expect("ContextFactory must be initialized in start()")
900 .create(
901 actor_id,
902 caller_id, // caller_id from transport layer (MessageRecord.from)
903 &envelope.request_id,
904 &credential_state.credential().await,
905 );
906
907 // 2. Static MessageRouter dispatch (zero-cost abstraction)
908 // Compiler will inline entire call chain, generating code close to hand-written match
909 //
910 // Wrap dispatch in panic catching to prevent handler panics from crashing the runtime
911 let result = std::panic::AssertUnwindSafe(W::Dispatcher::dispatch(
912 &self.workload,
913 envelope.clone(),
914 &ctx,
915 ))
916 .catch_unwind()
917 .await;
918
919 let result = match result {
920 Ok(handler_result) => handler_result,
921 Err(panic_payload) => {
922 // Handler panicked - extract panic info
923 let panic_info = if let Some(s) = panic_payload.downcast_ref::<&str>() {
924 s.to_string()
925 } else if let Some(s) = panic_payload.downcast_ref::<String>() {
926 s.clone()
927 } else {
928 "Unknown panic payload".to_string()
929 };
930
931 tracing::error!(
932 severity = 8,
933 error_category = "handler_panic",
934 route_key = envelope.route_key,
935 request_id = %envelope.request_id,
936 "❌ Handler panicked: {}",
937 panic_info
938 );
939
940 // Return DecodeFailure error with panic info
941 // (using DecodeFailure as a proxy for "cannot process message")
942 Err(actr_protocol::ProtocolError::Actr(
943 actr_protocol::ActrError::DecodeFailure {
944 message: format!("Handler panicked: {panic_info}"),
945 },
946 ))
947 }
948 };
949
950 // 3. Log result
951 match &result {
952 Ok(_) => tracing::debug!(
953 request_id = %envelope.request_id,
954 route_key = %envelope.route_key,
955 "✅ Message handled successfully"
956 ),
957 Err(e) => tracing::error!(
958 severity = 6,
959 error_category = "handler_error",
960 request_id = %envelope.request_id,
961 route_key = %envelope.route_key,
962 "❌ Message handling failed: {:?}", e
963 ),
964 }
965
966 result
967 }
968
969 /// Start the system
970 ///
971 /// # Startup Sequence
972 /// 1. Connect to signaling server and register Actor
973 /// 2. Initialize transport layer (WebRTC)
974 /// 3. Call lifecycle hook on_start (if Lifecycle trait is implemented)
975 /// 4. Start Mailbox processing loop (State Path serial processing)
976 /// 5. Start Transport (begin receiving messages)
977 /// 6. Create ActrRef for Shell to interact with Workload
978 ///
979 /// # Returns
980 /// - `ActrRef<W>`: Lightweight reference for Shell to call Workload methods
981 pub async fn start(mut self) -> ActorResult<crate::actr_ref::ActrRef<W>> {
982 tracing::info!("🚀 Starting ActrNode");
983
984 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
985 // 1. Connect to signaling server and register
986 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
987 tracing::info!("📡 Connecting to signaling server");
988 self.signaling_client.connect().await.map_err(|e| {
989 actr_protocol::ProtocolError::TransportError(format!("Signaling connect failed: {e}"))
990 })?;
991 tracing::info!("✅ Connected to signaling server");
992
993 // Get ActrType from configuration
994 let actr_type = self.config.actr_type().clone();
995 tracing::info!("📋 Actor type: {}", actr_type.to_string_repr());
996
997 // Calculate ServiceSpec from config exports
998 let service_spec = self.config.calculate_service_spec();
999 if let Some(ref spec) = service_spec {
1000 tracing::info!("📦 Service fingerprint: {}", spec.fingerprint);
1001 tracing::info!("📦 Service tags: {:?}", spec.tags);
1002 } else {
1003 tracing::info!("📦 No proto exports, ServiceSpec is None");
1004 }
1005
1006 // Construct protobuf RegisterRequest
1007 let register_request = RegisterRequest {
1008 actr_type: actr_type.clone(),
1009 realm: self.config.realm,
1010 service_spec,
1011 acl: self.config.acl.clone(),
1012 };
1013
1014 tracing::info!("📤 Registering actor with signaling server (protobuf)");
1015
1016 // Use send_register_request to send and wait for response
1017 let register_response = self
1018 .signaling_client
1019 .send_register_request(register_request)
1020 .await
1021 .map_err(|e| {
1022 actr_protocol::ProtocolError::TransportError(format!(
1023 "Actor registration failed: {e}"
1024 ))
1025 })?;
1026
1027 // Handle RegisterResponse oneof result
1028 //
1029 // Collect background task handles (including unregister task) so they can be managed
1030 // by ActrRefShared later.
1031 let mut task_handles = Vec::new();
1032
1033 match register_response.result {
1034 Some(register_response::Result::Success(register_ok)) => {
1035 let actor_id = register_ok.actr_id;
1036 let credential = register_ok.credential;
1037
1038 tracing::info!("✅ Registration successful");
1039 tracing::info!(
1040 "🆔 Assigned ActrId: {}",
1041 actr_protocol::ActrIdExt::to_string_repr(&actor_id)
1042 );
1043 tracing::info!(
1044 "🔐 Received credential (token_key_id: {})",
1045 credential.token_key_id
1046 );
1047 tracing::info!(
1048 "💓 Signaling heartbeat interval: {} seconds",
1049 register_ok.signaling_heartbeat_interval_secs
1050 );
1051
1052 // Log additional information (if available)
1053 if register_ok.psk.is_some() {
1054 tracing::debug!("🔑 Received PSK (bootstrap keying material)");
1055 }
1056
1057 if let Some(expires_at) = ®ister_ok.credential_expires_at {
1058 tracing::debug!("⏰ Credential expires at: {}s", expires_at.seconds);
1059 }
1060
1061 // Store ActrId and Credential
1062 self.actor_id = Some(actor_id.clone());
1063 let credential_state = CredentialState::new(
1064 credential,
1065 register_ok.credential_expires_at,
1066 register_ok.psk.clone(),
1067 );
1068 self.credential_state = Some(credential_state.clone());
1069
1070 // Pass identity to signaling client so reconnect URLs carry auth info.
1071 self.signaling_client.set_actor_id(actor_id.clone()).await;
1072 self.signaling_client
1073 .set_credential_state(credential_state.clone())
1074 .await;
1075 // Store PSK and public_key for TURN authentication
1076 self.psk = register_ok.psk.clone();
1077
1078 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1079 // 1.2. Set actr_lock in ContextFactory for fingerprint lookups
1080 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1081 if let Some(actr_lock) = self.actr_lock.clone() {
1082 self.context_factory
1083 .as_mut()
1084 .expect("ContextFactory must exist")
1085 .set_actr_lock(actr_lock);
1086 tracing::info!(
1087 "✅ Actr.lock.toml set in ContextFactory for fingerprint lookups"
1088 );
1089 }
1090
1091 // Set config_dir in ContextFactory for compat.lock.toml Fast Path
1092 self.context_factory
1093 .as_mut()
1094 .expect("ContextFactory must exist")
1095 .set_config_dir(self.config.config_dir.clone());
1096 tracing::info!(
1097 "✅ config_dir set in ContextFactory for compat.lock.toml Fast Path"
1098 );
1099
1100 // Persist identity into ContextFactory for later Context creation
1101 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1102 // 1.3. Store references to both inproc managers (already created in ActrSystem::new())
1103 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1104 let shell_to_workload = self
1105 .context_factory
1106 .as_ref()
1107 .expect("ContextFactory must exist")
1108 .shell_to_workload();
1109 let workload_to_shell = self
1110 .context_factory
1111 .as_ref()
1112 .expect("ContextFactory must exist")
1113 .workload_to_shell();
1114 self.inproc_mgr = Some(shell_to_workload); // Workload receives from this
1115 self.workload_to_shell_mgr = Some(workload_to_shell); // Workload sends to this
1116
1117 tracing::info!(
1118 "✅ Inproc infrastructure already ready (created in ActrSystem::new())"
1119 );
1120
1121 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1122 // 1.5. Create WebRTC infrastructure
1123 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1124 tracing::info!("🌐 Initializing WebRTC infrastructure");
1125
1126 // Get MediaFrameRegistry from ContextFactory
1127 let media_frame_registry = self
1128 .context_factory
1129 .as_ref()
1130 .expect("ContextFactory must exist")
1131 .media_frame_registry
1132 .clone();
1133
1134 // Create WebRtcCoordinator
1135 let coordinator =
1136 Arc::new(crate::wire::webrtc::coordinator::WebRtcCoordinator::new(
1137 actor_id.clone(),
1138 credential_state.clone(),
1139 self.signaling_client.clone(),
1140 self.config.webrtc.clone(),
1141 self.config.realm.realm_id.clone(),
1142 media_frame_registry,
1143 ));
1144
1145 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1146 // 1.6. Create OutprocTransportManager + OutprocOutGate (新架构)
1147 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1148 tracing::info!("🏗️ Creating OutprocTransportManager with WebRTC support");
1149
1150 // Create DefaultWireBuilder with WebRTC coordinator
1151 use crate::transport::{DefaultWireBuilder, DefaultWireBuilderConfig};
1152 let wire_builder_config = DefaultWireBuilderConfig {
1153 websocket_url_template: None, // WebSocket disabled for now
1154 enable_webrtc: true,
1155 enable_websocket: false,
1156 };
1157 let wire_builder = Arc::new(DefaultWireBuilder::new(
1158 Some(coordinator.clone()),
1159 wire_builder_config,
1160 ));
1161
1162 // Create OutprocTransportManager
1163 use crate::transport::OutprocTransportManager;
1164 let transport_manager =
1165 Arc::new(OutprocTransportManager::new(actor_id.clone(), wire_builder));
1166
1167 // Create OutprocOutGate with WebRTC coordinator for MediaTrack support
1168 use crate::outbound::{OutGate, OutprocOutGate};
1169 let outproc_gate = Arc::new(OutprocOutGate::new(
1170 transport_manager,
1171 Some(coordinator.clone()), // Enable MediaTrack support
1172 ));
1173 let outproc_gate_enum = OutGate::OutprocOut(outproc_gate.clone());
1174
1175 tracing::info!("✅ OutprocTransportManager + OutprocOutGate initialized");
1176
1177 // Get DataStreamRegistry from ContextFactory
1178 let data_stream_registry = self
1179 .context_factory
1180 .as_ref()
1181 .expect("ContextFactory must exist")
1182 .data_stream_registry
1183 .clone();
1184
1185 // Create WebRtcGate with shared pending_requests and DataStreamRegistry
1186 let pending_requests = outproc_gate.get_pending_requests();
1187 let gate = Arc::new(crate::wire::webrtc::gate::WebRtcGate::new(
1188 coordinator.clone(),
1189 pending_requests,
1190 data_stream_registry,
1191 ));
1192
1193 // Set local_id
1194 gate.set_local_id(actor_id.clone()).await;
1195
1196 tracing::info!(
1197 "✅ WebRtcGate created with shared pending_requests and DataStreamRegistry"
1198 );
1199
1200 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1201 // 1.7. Set outproc_gate in ContextFactory (completing initialization)
1202 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1203 tracing::info!("🔧 Setting outproc_gate in ContextFactory");
1204 self.context_factory
1205 .as_mut()
1206 .expect("ContextFactory must exist")
1207 .set_outproc_gate(outproc_gate_enum);
1208
1209 tracing::info!(
1210 "✅ ContextFactory fully initialized (inproc + outproc gates ready)"
1211 );
1212
1213 // Save references (WebRtcGate kept for backward compatibility if needed)
1214 self.webrtc_coordinator = Some(coordinator.clone());
1215 self.webrtc_gate = Some(gate.clone());
1216
1217 tracing::info!("✅ WebRTC infrastructure initialized");
1218
1219 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1220 // 1.7.5. Create shared state for credential management
1221 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1222 // Shared credential state initialized above; reused across tasks
1223
1224 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1225 // 1.8. Spawn heartbeat task (periodic Ping to signaling server)
1226 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1227 {
1228 let shutdown = self.shutdown_token.clone();
1229 let client = self.signaling_client.clone();
1230 let actor_id_for_heartbeat = actor_id.clone();
1231 let credential_state_for_heartbeat = credential_state.clone();
1232 let mailbox_for_heartbeat = self.mailbox.clone();
1233
1234 // Use interval from registration response, default to 30s
1235 let heartbeat_interval_secs = register_ok.signaling_heartbeat_interval_secs;
1236 let heartbeat_interval = if heartbeat_interval_secs > 0 {
1237 Duration::from_secs(heartbeat_interval_secs as u64)
1238 } else {
1239 Duration::from_secs(30)
1240 };
1241
1242 let heartbeat_handle = tokio::spawn(heartbeat_task(
1243 shutdown,
1244 client,
1245 actor_id_for_heartbeat,
1246 credential_state_for_heartbeat,
1247 mailbox_for_heartbeat,
1248 heartbeat_interval,
1249 ));
1250
1251 task_handles.push(heartbeat_handle);
1252 }
1253 tracing::info!(
1254 "✅ Heartbeat task started (interval: {}s)",
1255 register_ok.signaling_heartbeat_interval_secs
1256 );
1257
1258 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1259 // 1.8.5. Spawn network event processing loop
1260 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1261 if let (Some(event_rx), Some(result_tx)) = (
1262 self.network_event_rx.take(),
1263 self.network_event_result_tx.take(),
1264 ) {
1265 use crate::lifecycle::network_event::DefaultNetworkEventProcessor;
1266
1267 // 创建 DefaultNetworkEventProcessor
1268 let event_processor = Arc::new(DefaultNetworkEventProcessor::new(
1269 self.signaling_client.clone(),
1270 self.webrtc_coordinator.clone(),
1271 ));
1272
1273 let shutdown = self.shutdown_token.clone();
1274
1275 let network_event_handle = tokio::spawn(async move {
1276 Self::network_event_loop(event_rx, result_tx, event_processor, shutdown)
1277 .await;
1278 });
1279
1280 task_handles.push(network_event_handle);
1281 tracing::info!("✅ Network event loop started");
1282 }
1283
1284 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1285 // 1.9. Spawn dedicated Unregister task (best-effort, with timeout)
1286 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1287 //
1288 // This task:
1289 // - Waits for shutdown_token to be cancelled (e.g., wait_for_ctrl_c_and_shutdown)
1290 // - Then sends UnregisterRequest via signaling client with a timeout
1291 //
1292 // NOTE: we push its JoinHandle into task_handles so it can be aborted
1293 // by ActrRefShared::Drop if needed.
1294 {
1295 let shutdown = self.shutdown_token.clone();
1296 let client = self.signaling_client.clone();
1297 let actor_id_for_unreg = actor_id.clone();
1298 let credential_state_for_unreg = credential_state.clone();
1299 let webrtc_coordinator = self.webrtc_coordinator.clone();
1300
1301 let unregister_handle = tokio::spawn(async move {
1302 // Wait for shutdown signal
1303 shutdown.cancelled().await;
1304 tracing::info!(
1305 "📡 Shutdown signal received2, sending UnregisterRequest for Actor {:?}",
1306 actor_id_for_unreg
1307 );
1308
1309 // 1. 先关闭所有 WebRTC peer 连接(如果存在)
1310 if let Some(coord) = webrtc_coordinator {
1311 if let Err(e) = coord.close_all_peers().await {
1312 tracing::warn!(
1313 "⚠️ Failed to close all WebRTC peers before UnregisterRequest: {}",
1314 e
1315 );
1316 } else {
1317 tracing::info!(
1318 "✅ All WebRTC peers closed before UnregisterRequest"
1319 );
1320 }
1321 } else {
1322 tracing::debug!(
1323 "WebRTC coordinator not found before UnregisterRequest (no WebRTC?)"
1324 );
1325 }
1326
1327 // 2. 再发送 UnregisterRequest,设置一个超时(例如 5 秒)
1328 let result = tokio::time::timeout(
1329 std::time::Duration::from_secs(5),
1330 client.send_unregister_request(
1331 actor_id_for_unreg.clone(),
1332 credential_state_for_unreg.credential().await,
1333 Some("Graceful shutdown".to_string()),
1334 ),
1335 )
1336 .await;
1337 tracing::info!("UnregisterRequest result: {:?}", result);
1338 match result {
1339 Ok(Ok(_)) => {
1340 tracing::info!(
1341 "✅ UnregisterRequest sent to signaling server for Actor {:?}",
1342 actor_id_for_unreg
1343 );
1344 }
1345 Ok(Err(e)) => {
1346 tracing::warn!(
1347 "⚠️ Failed to send UnregisterRequest for Actor {:?}: {}",
1348 actor_id_for_unreg,
1349 e
1350 );
1351 }
1352 Err(_) => {
1353 tracing::warn!(
1354 "⚠️ UnregisterRequest timeout (5s) for Actor {:?}",
1355 actor_id_for_unreg
1356 );
1357 }
1358 }
1359 });
1360
1361 task_handles.push(unregister_handle);
1362 }
1363 }
1364 Some(register_response::Result::Error(error)) => {
1365 tracing::error!(
1366 severity = 10,
1367 error_category = "registration_error",
1368 error_code = error.code,
1369 "❌ Registration failed: code={}, message={}",
1370 error.code,
1371 error.message
1372 );
1373 return Err(actr_protocol::ProtocolError::TransportError(format!(
1374 "Registration rejected: {} (code: {})",
1375 error.message, error.code
1376 )));
1377 }
1378 None => {
1379 tracing::error!(
1380 severity = 10,
1381 error_category = "registration_error",
1382 "❌ Registration response missing result"
1383 );
1384 return Err(actr_protocol::ProtocolError::TransportError(
1385 "Invalid registration response: missing result".to_string(),
1386 ));
1387 }
1388 }
1389
1390 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1391 // 2. Transport layer initialization (completed via WebRTC infrastructure)
1392 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1393 tracing::info!("✅ Transport layer initialized via WebRTC infrastructure");
1394
1395 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1396 // 3.1 Convert to Arc (before starting background loops)
1397 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1398 // Clone actor_id before moving self into Arc
1399 let actor_id = self
1400 .actor_id
1401 .as_ref()
1402 .ok_or_else(|| {
1403 actr_protocol::ProtocolError::InvalidStateTransition(
1404 "Actor ID not set - registration must complete before starting node"
1405 .to_string(),
1406 )
1407 })?
1408 .clone();
1409 let credential_state = self.credential_state.clone().ok_or_else(|| {
1410 actr_protocol::ProtocolError::InvalidStateTransition(
1411 "Credential not set - node must be started before handling messages".to_string(),
1412 )
1413 })?;
1414
1415 let actor_id_for_shell = actor_id.clone();
1416 let shutdown_token = self.shutdown_token.clone();
1417 let node_ref = Arc::new(self);
1418
1419 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1420 // 3.5. Start WebRTC background loops (BEFORE on_start)
1421 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1422 // CRITICAL: Start signaling loop before on_start() to avoid deadlock
1423 // where on_start() tries to send messages but signaling loop isn't running
1424 tracing::info!("🚀 Starting WebRTC background loops");
1425
1426 // Start WebRtcCoordinator signaling loop
1427 if let Some(coordinator) = &node_ref.webrtc_coordinator {
1428 coordinator.clone().start().await.map_err(|e| {
1429 actr_protocol::ProtocolError::TransportError(format!(
1430 "WebRtcCoordinator start failed: {e}"
1431 ))
1432 })?;
1433 tracing::info!("✅ WebRtcCoordinator signaling loop started");
1434 }
1435
1436 // Start WebRtcGate message receive loop (route to Mailbox)
1437 if let Some(gate) = &node_ref.webrtc_gate {
1438 gate.start_receive_loop(node_ref.mailbox.clone())
1439 .await
1440 .map_err(|e| {
1441 actr_protocol::ProtocolError::TransportError(format!(
1442 "WebRtcGate receive loop start failed: {e}"
1443 ))
1444 })?;
1445 tracing::info!("✅ WebRtcGate → Mailbox routing started");
1446 }
1447
1448 tracing::info!("✅ WebRTC background loops started");
1449
1450 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1451 // 4. Call lifecycle hook on_start (AFTER WebRTC loops are running)
1452 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1453 tracing::info!("🪝 Calling lifecycle hook: on_start");
1454
1455 let ctx = node_ref
1456 .context_factory
1457 .as_ref()
1458 .expect("ContextFactory must be initialized before on_start")
1459 .create(
1460 &actor_id,
1461 None, // caller_id
1462 "bootstrap", // request_id
1463 &credential_state.credential().await,
1464 );
1465 node_ref.workload.on_start(&ctx).await?;
1466 tracing::info!("✅ Lifecycle hook on_start completed");
1467
1468 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1469 // 4.6. Start Inproc receive loop (Shell → Workload)
1470 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1471 tracing::info!("🔄 Starting Inproc receive loop (Shell → Workload)");
1472 // Start Workload receive loop (Shell → Workload REQUEST)
1473 if let Some(shell_to_workload) = &node_ref.inproc_mgr {
1474 if let Some(workload_to_shell) = &node_ref.workload_to_shell_mgr {
1475 let node = node_ref.clone();
1476 let request_rx_lane = shell_to_workload
1477 .get_lane(actr_protocol::PayloadType::RpcReliable, None)
1478 .await
1479 .map_err(|e| {
1480 actr_protocol::ProtocolError::TransportError(format!(
1481 "Failed to get Workload receive lane: {e}"
1482 ))
1483 })?;
1484 let response_tx = workload_to_shell.clone();
1485 let shutdown = shutdown_token.clone();
1486
1487 let inproc_handle = tokio::spawn(async move {
1488 loop {
1489 tokio::select! {
1490 _ = shutdown.cancelled() => {
1491 tracing::info!("📭 Workload receive loop (Shell → Workload) received shutdown signal");
1492 break;
1493 }
1494
1495 envelope_result = request_rx_lane.recv_envelope() => {
1496 match envelope_result {
1497 Ok(envelope) => {
1498 let request_id = envelope.request_id.clone();
1499 // Extract and set tracing context from envelope
1500 #[cfg(feature = "opentelemetry")]
1501 let span = {
1502 let span = tracing::info_span!("actrNode.lane.receive_rpc", request_id = %request_id);
1503 set_parent_from_rpc_envelope(&span, &envelope);
1504 span
1505 };
1506
1507 tracing::debug!("📨 Workload received REQUEST from Shell: request_id={}", request_id);
1508
1509 // Shell calls have no caller_id (local process communication)
1510 let handle_incoming_fut = node.handle_incoming(envelope.clone(), None);
1511 #[cfg(feature = "opentelemetry")]
1512 let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
1513 match handle_incoming_fut.await {
1514 Ok(response_bytes) => {
1515 // Send RESPONSE back via workload_to_shell
1516 // Keep same route_key (no prefix needed - separate channels!)
1517 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1518 let mut response_envelope = RpcEnvelope {
1519 route_key: envelope.route_key.clone(),
1520 payload: Some(response_bytes),
1521 error: None,
1522 traceparent: None,
1523 tracestate: None,
1524 request_id: request_id.clone(),
1525 metadata: Vec::new(),
1526 timeout_ms: 30000,
1527 };
1528 // Inject tracing context
1529 #[cfg(feature = "opentelemetry")]
1530 inject_span_context_to_rpc(&span, &mut response_envelope);
1531
1532 // Send via Workload → Shell channel
1533 let send_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, response_envelope);
1534 #[cfg(feature = "opentelemetry")]
1535 let send_response_fut = send_response_fut.instrument(span.clone());
1536 if let Err(e) = send_response_fut.await {
1537 tracing::error!(
1538 severity = 7,
1539 error_category = "transport_error",
1540 request_id = %request_id,
1541 "❌ Failed to send RESPONSE to Shell: {:?}", e
1542 );
1543 }
1544 }
1545 Err(e) => {
1546 tracing::error!(
1547 severity = 6,
1548 error_category = "handler_error",
1549 request_id = %request_id,
1550 route_key = %envelope.route_key,
1551 "❌ Workload message handling failed: {:?}", e
1552 );
1553
1554 // Send error response (system-level error on envelope)
1555 let error_response = actr_protocol::ErrorResponse {
1556 code: protocol_error_to_code(&e),
1557 message: e.to_string(),
1558 };
1559
1560 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1561 let mut error_envelope = RpcEnvelope {
1562 route_key: envelope.route_key.clone(),
1563 payload: None,
1564 error: Some(error_response),
1565 traceparent: envelope.traceparent.clone(),
1566 tracestate: envelope.tracestate.clone(),
1567 request_id: request_id.clone(),
1568 metadata: Vec::new(),
1569 timeout_ms: 30000,
1570 };
1571 // Inject tracing context
1572 #[cfg(feature = "opentelemetry")]
1573 inject_span_context_to_rpc(&span, &mut error_envelope);
1574
1575 let send_error_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, error_envelope);
1576 #[cfg(feature = "opentelemetry")]
1577 let send_error_response_fut = send_error_response_fut.instrument(span);
1578 if let Err(e) = send_error_response_fut.await {
1579 tracing::error!(
1580 severity = 7,
1581 error_category = "transport_error",
1582 request_id = %request_id,
1583 "❌ Failed to send ERROR response to Shell: {:?}", e
1584 );
1585 }
1586 }
1587 }
1588 }
1589 Err(e) => {
1590 tracing::error!(
1591 severity = 8,
1592 error_category = "transport_error",
1593 "❌ Failed to receive from Shell → Workload lane: {:?}", e
1594 );
1595 break;
1596 }
1597 }
1598 }
1599 }
1600 }
1601 tracing::info!(
1602 "✅ Workload receive loop (Shell → Workload) terminated gracefully"
1603 );
1604 });
1605
1606 task_handles.push(inproc_handle);
1607 }
1608 }
1609 tracing::info!("✅ Workload receive loop (Shell → Workload REQUEST) started");
1610
1611 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1612 // 4.7. Start Shell receive loop (Workload → Shell RESPONSE)
1613 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1614 tracing::info!("🔄 Starting Shell receive loop (Workload → Shell RESPONSE)");
1615 if let Some(workload_to_shell) = &node_ref.workload_to_shell_mgr {
1616 if let Some(shell_to_workload) = &node_ref.inproc_mgr {
1617 let response_rx_lane = workload_to_shell
1618 .get_lane(actr_protocol::PayloadType::RpcReliable, None)
1619 .await
1620 .map_err(|e| {
1621 actr_protocol::ProtocolError::TransportError(format!(
1622 "Failed to get Shell receive lane: {e}"
1623 ))
1624 })?;
1625 let request_mgr = shell_to_workload.clone();
1626 let shutdown = shutdown_token.clone();
1627
1628 let shell_receive_handle = tokio::spawn(async move {
1629 loop {
1630 tokio::select! {
1631 _ = shutdown.cancelled() => {
1632 tracing::info!("📭 Shell receive loop (Workload → Shell) received shutdown signal");
1633 break;
1634 }
1635
1636 envelope_result = response_rx_lane.recv_envelope() => {
1637 match envelope_result {
1638 Ok(envelope) => {
1639 tracing::debug!("📨 Shell received RESPONSE from Workload: request_id={}", envelope.request_id);
1640
1641 // Check if response is success or error
1642 match (envelope.payload, envelope.error) {
1643 (Some(payload), None) => {
1644 // Success response
1645 if let Err(e) = request_mgr.complete_response(&envelope.request_id, payload).await {
1646 tracing::warn!(
1647 severity = 4,
1648 error_category = "orphan_response",
1649 request_id = %envelope.request_id,
1650 "⚠️ No pending request found for response: {:?}", e
1651 );
1652 }
1653 }
1654 (None, Some(error)) => {
1655 // Error response - convert to ProtocolError and complete with error
1656 let protocol_err = actr_protocol::ProtocolError::TransportError(
1657 format!("RPC error {}: {}", error.code, error.message)
1658 );
1659 if let Err(e) = request_mgr.complete_error(&envelope.request_id, protocol_err).await {
1660 tracing::warn!(
1661 severity = 4,
1662 error_category = "orphan_response",
1663 request_id = %envelope.request_id,
1664 "⚠️ No pending request found for error response: {:?}", e
1665 );
1666 }
1667 }
1668 _ => {
1669 tracing::error!(
1670 severity = 7,
1671 error_category = "protocol_error",
1672 request_id = %envelope.request_id,
1673 "❌ Invalid RpcEnvelope: both payload and error are present or both absent"
1674 );
1675 }
1676 }
1677 }
1678 Err(e) => {
1679 tracing::error!(
1680 severity = 8,
1681 error_category = "transport_error",
1682 "❌ Failed to receive from Workload → Shell lane: {:?}", e
1683 );
1684 break;
1685 }
1686 }
1687 }
1688 }
1689 }
1690 tracing::info!(
1691 "✅ Shell receive loop (Workload → Shell) terminated gracefully"
1692 );
1693 });
1694
1695 task_handles.push(shell_receive_handle);
1696 }
1697 }
1698 tracing::info!("✅ Shell receive loop (Workload → Shell RESPONSE) started");
1699
1700 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1701 // 5. Start Mailbox processing loop (State Path)
1702 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1703 tracing::info!("🔄 Starting Mailbox processing loop (State Path)");
1704 {
1705 let node = node_ref.clone();
1706 let mailbox = node_ref.mailbox.clone();
1707 let gate = node_ref.webrtc_gate.clone();
1708 let shutdown = shutdown_token.clone();
1709
1710 let mailbox_handle = tokio::spawn(async move {
1711 loop {
1712 tokio::select! {
1713 // Listen for shutdown signal
1714 _ = shutdown.cancelled() => {
1715 tracing::info!("📭 Mailbox loop received shutdown signal");
1716 break;
1717 }
1718
1719 // Dequeue messages (by priority)
1720 result = mailbox.dequeue() => {
1721 match result {
1722 Ok(messages) => {
1723 if messages.is_empty() {
1724 // Queue empty, sleep briefly
1725 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1726 continue;
1727 }
1728
1729 tracing::debug!("📬 Mailbox dequeue: {} messages", messages.len());
1730
1731 // Process messages one by one
1732 for msg_record in messages {
1733 // Deserialize RpcEnvelope (Protobuf)
1734 match RpcEnvelope::decode(&msg_record.payload[..]) {
1735 Ok(envelope) => {
1736 let request_id = envelope.request_id.clone();
1737 #[cfg(feature = "opentelemetry")]
1738 let span = {
1739 let span = tracing::info_span!("actrNode.mailbox.receive_rpc", request_id = %request_id);
1740 set_parent_from_rpc_envelope(&span, &envelope);
1741 span
1742 };
1743 tracing::debug!("📦 Processing message: request_id={}", request_id);
1744
1745 // Decode caller_id from MessageRecord.from (transport layer)
1746 let caller_id_result = ActrId::decode(&msg_record.from[..]);
1747 let caller_id_ref = caller_id_result.as_ref().ok();
1748
1749 if caller_id_ref.is_none() {
1750 tracing::warn!(
1751 request_id = %request_id,
1752 "⚠️ Failed to decode caller_id from MessageRecord.from"
1753 );
1754 }
1755
1756 // Call handle_incoming with caller_id from transport layer
1757 let handle_incoming_fut = node.handle_incoming(envelope.clone(), caller_id_ref);
1758 #[cfg(feature = "opentelemetry")]
1759 let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
1760 match handle_incoming_fut.await {
1761 Ok(response_bytes) => {
1762 // Send response (reuse request_id)
1763 if let Some(ref gate) = gate {
1764 // Use already decoded caller_id
1765 match caller_id_result {
1766 Ok(caller) => {
1767 // Construct response RpcEnvelope (reuse request_id!)
1768 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1769 let mut response_envelope = RpcEnvelope {
1770 request_id, // Reuse!
1771 route_key: envelope.route_key.clone(),
1772 payload: Some(response_bytes),
1773 error: None,
1774 traceparent: envelope.traceparent.clone(),
1775 tracestate: envelope.tracestate.clone(),
1776 metadata: Vec::new(), // Response doesn't need extra metadata
1777 timeout_ms: 30000,
1778 };
1779 // Inject tracing context
1780 #[cfg(feature = "opentelemetry")]
1781 inject_span_context_to_rpc(&span, &mut response_envelope);
1782
1783 let send_response_fut = gate.send_response(&caller, response_envelope);
1784 #[cfg(feature = "opentelemetry")]
1785 let send_response_fut = send_response_fut.instrument(span);
1786 if let Err(e) = send_response_fut.await {
1787 tracing::error!(
1788 severity = 7,
1789 error_category = "transport_error",
1790 request_id = %envelope.request_id,
1791 "❌ Failed to send response: {:?}", e
1792 );
1793 }
1794 }
1795 Err(e) => {
1796 tracing::error!(
1797 severity = 8,
1798 error_category = "protobuf_decode",
1799 request_id = %envelope.request_id,
1800 "❌ Failed to decode caller_id: {:?}", e
1801 );
1802 }
1803 }
1804 }
1805
1806 // ACK message
1807 if let Err(e) = mailbox.ack(msg_record.id).await {
1808 tracing::error!(
1809 severity = 9,
1810 error_category = "mailbox_error",
1811 request_id = %envelope.request_id,
1812 message_id = %msg_record.id,
1813 "❌ Mailbox ACK failed: {:?}", e
1814 );
1815 }
1816 }
1817 Err(e) => {
1818 tracing::error!(
1819 severity = 6,
1820 error_category = "handler_error",
1821 request_id = %envelope.request_id,
1822 route_key = %envelope.route_key,
1823 "❌ handle_incoming failed: {:?}", e
1824 );
1825 // ACK to avoid infinite retries
1826 // Application errors are caller's responsibility
1827 let _ = mailbox.ack(msg_record.id).await;
1828 }
1829 }
1830 }
1831 Err(e) => {
1832 // Poison message - cannot decode RpcEnvelope
1833 tracing::error!(
1834 severity = 9,
1835 error_category = "protobuf_decode",
1836 message_id = %msg_record.id,
1837 "❌ Poison message: Failed to deserialize RpcEnvelope: {:?}", e
1838 );
1839
1840 // Write to Dead Letter Queue
1841 use actr_mailbox::DlqRecord;
1842 use chrono::Utc;
1843 use uuid::Uuid;
1844
1845 let dlq_record = DlqRecord {
1846 id: Uuid::new_v4(),
1847 original_message_id: Some(msg_record.id.to_string()),
1848 from: Some(msg_record.from.clone()),
1849 to: node.actor_id.as_ref().map(|id| {
1850 let mut buf = Vec::new();
1851 id.encode(&mut buf).unwrap();
1852 buf
1853 }),
1854 raw_bytes: msg_record.payload.clone(),
1855 error_message: format!("Protobuf decode failed: {e}"),
1856 error_category: "protobuf_decode".to_string(),
1857 trace_id: format!("mailbox-{}", msg_record.id), // Fallback trace_id
1858 request_id: None,
1859 created_at: Utc::now(),
1860 redrive_attempts: 0,
1861 last_redrive_at: None,
1862 context: Some(format!(
1863 r#"{{"source":"mailbox","priority":"{}"}}"#,
1864 match msg_record.priority {
1865 actr_mailbox::MessagePriority::High => "high",
1866 actr_mailbox::MessagePriority::Normal => "normal",
1867 }
1868 )),
1869 };
1870
1871 if let Err(dlq_err) = node.dlq.enqueue(dlq_record).await {
1872 tracing::error!(
1873 severity = 10,
1874 "❌ CRITICAL: Failed to write poison message to DLQ: {:?}", dlq_err
1875 );
1876 } else {
1877 tracing::warn!(
1878 severity = 9,
1879 "☠️ Poison message moved to DLQ: message_id={}", msg_record.id
1880 );
1881 }
1882
1883 // ACK the poison message to remove from mailbox
1884 let _ = mailbox.ack(msg_record.id).await;
1885 }
1886 }
1887 }
1888 }
1889 Err(e) => {
1890 tracing::error!(
1891 severity = 9,
1892 error_category = "mailbox_error",
1893 "❌ Mailbox dequeue failed: {:?}", e
1894 );
1895 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1896 }
1897 }
1898 }
1899 }
1900 }
1901 tracing::info!("✅ Mailbox processing loop terminated gracefully");
1902 });
1903
1904 task_handles.push(mailbox_handle);
1905 }
1906 tracing::info!("✅ Mailbox processing loop started");
1907
1908 tracing::info!("✅ ActrNode started successfully");
1909
1910 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1911 // 6. Create ActrRef for Shell to interact with Workload
1912 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1913 use crate::actr_ref::{ActrRef, ActrRefShared};
1914 use crate::outbound::InprocOutGate;
1915
1916 // Create InprocOutGate from shell_to_workload transport manager
1917 let shell_to_workload = node_ref
1918 .inproc_mgr
1919 .clone()
1920 .expect("inproc_mgr must be initialized");
1921 let inproc_gate = Arc::new(InprocOutGate::new(shell_to_workload));
1922
1923 // Create ActrRefShared
1924 let actr_ref_shared = Arc::new(ActrRefShared {
1925 actor_id: actor_id_for_shell.clone(),
1926 inproc_gate,
1927 shutdown_token: shutdown_token.clone(),
1928 task_handles: tokio::sync::Mutex::new(task_handles),
1929 });
1930
1931 // Create ActrRef
1932 let actr_ref = ActrRef::new(actr_ref_shared, node_ref);
1933
1934 tracing::info!("✅ ActrRef created (Shell → Workload communication handle)");
1935
1936 Ok(actr_ref)
1937 }
1938}
1939
1940#[cfg(test)]
1941mod tests {
1942 use super::*;
1943 use actr_protocol::AIdCredential;
1944 use prost_types::Timestamp;
1945
1946 fn create_test_credential(token_key_id: u32) -> AIdCredential {
1947 AIdCredential {
1948 encrypted_token: vec![1, 2, 3, 4].into(),
1949 token_key_id,
1950 }
1951 }
1952
1953 fn create_test_timestamp(seconds: i64) -> Timestamp {
1954 Timestamp { seconds, nanos: 0 }
1955 }
1956
1957 #[tokio::test]
1958 async fn test_credential_state_initialization() {
1959 let credential = create_test_credential(1);
1960 let expires_at = Some(create_test_timestamp(1000));
1961
1962 let state = CredentialState::new(credential.clone(), expires_at, None);
1963
1964 let retrieved_credential = state.credential().await;
1965 assert_eq!(retrieved_credential.token_key_id, 1);
1966 assert_eq!(retrieved_credential.encrypted_token.as_ref(), &[1, 2, 3, 4]);
1967
1968 let retrieved_expires_at = state.expires_at().await;
1969 assert_eq!(retrieved_expires_at, expires_at);
1970 }
1971
1972 #[tokio::test]
1973 async fn test_credential_state_without_expiration() {
1974 let credential = create_test_credential(2);
1975 let state = CredentialState::new(credential.clone(), None, None);
1976
1977 let retrieved_credential = state.credential().await;
1978 assert_eq!(retrieved_credential.token_key_id, 2);
1979
1980 let retrieved_expires_at = state.expires_at().await;
1981 assert!(retrieved_expires_at.is_none());
1982 }
1983
1984 #[tokio::test]
1985 async fn test_credential_state_update() {
1986 let credential1 = create_test_credential(1);
1987 let expires_at1 = Some(create_test_timestamp(1000));
1988 let state = CredentialState::new(credential1, expires_at1, None);
1989
1990 // Verify initial state
1991 let initial_credential = state.credential().await;
1992 assert_eq!(initial_credential.token_key_id, 1);
1993
1994 // Update credential
1995 let credential2 = create_test_credential(2);
1996 let expires_at2 = Some(create_test_timestamp(2000));
1997 state.update(credential2.clone(), expires_at2, None).await;
1998
1999 // Verify updated state
2000 let updated_credential = state.credential().await;
2001 assert_eq!(updated_credential.token_key_id, 2);
2002 assert_eq!(
2003 updated_credential.encrypted_token,
2004 credential2.encrypted_token
2005 );
2006
2007 let updated_expires_at = state.expires_at().await;
2008 assert_eq!(updated_expires_at, Some(create_test_timestamp(2000)));
2009 }
2010
2011 #[tokio::test]
2012 async fn test_credential_state_concurrent_access() {
2013 let credential = create_test_credential(1);
2014 let expires_at = Some(create_test_timestamp(1000));
2015 let state = CredentialState::new(credential, expires_at, None);
2016
2017 // Spawn multiple tasks that concurrently access the credential state
2018 let mut handles = vec![];
2019 for i in 0..10 {
2020 let state_clone = state.clone();
2021 let handle = tokio::spawn(async move {
2022 let cred = state_clone.credential().await;
2023 assert_eq!(cred.token_key_id, 1);
2024 i
2025 });
2026 handles.push(handle);
2027 }
2028
2029 // Wait for all tasks to complete
2030 for handle in handles {
2031 let result = handle.await.unwrap();
2032 assert!(result < 10);
2033 }
2034 }
2035
2036 #[tokio::test]
2037 async fn test_credential_state_update_concurrent() {
2038 let credential1 = create_test_credential(1);
2039 let state = CredentialState::new(credential1, None, None);
2040
2041 // Spawn multiple update tasks
2042 let mut handles = vec![];
2043 for i in 2..12 {
2044 let state_clone = state.clone();
2045 let credential = create_test_credential(i);
2046 let handle = tokio::spawn(async move {
2047 state_clone.update(credential, None, None).await;
2048 });
2049 handles.push(handle);
2050 }
2051
2052 // Wait for all updates to complete
2053 for handle in handles {
2054 handle.await.unwrap();
2055 }
2056
2057 // Verify final state (should be the last update)
2058 let final_credential = state.credential().await;
2059 // The exact value depends on which update finished last, but it should be valid
2060 assert!(final_credential.token_key_id >= 2 && final_credential.token_key_id <= 11);
2061 }
2062}