actr_runtime/lifecycle/actr_node.rs
1//! ActrNode - ActrSystem + Workload (1:1 composition)
2
3use crate::context_factory::ContextFactory;
4use crate::transport::InprocTransportManager;
5#[cfg(feature = "opentelemetry")]
6use crate::wire::webrtc::trace::{inject_span_context_to_rpc, set_parent_from_rpc_envelope};
7use actr_framework::{Bytes, Workload};
8use actr_mailbox::{DeadLetterQueue, Mailbox};
9use actr_protocol::prost::Message as ProstMessage;
10use actr_protocol::{
11 AIdCredential, ActorResult, ActrId, ActrType, PayloadType, RegisterRequest,
12 RouteCandidatesRequest, RpcEnvelope, register_response, route_candidates_request,
13};
14use futures_util::FutureExt;
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::RwLock;
18use tokio_util::sync::CancellationToken;
19#[cfg(feature = "opentelemetry")]
20use tracing::Instrument as _;
21// Use types from sub-crates
22use crate::wire::webrtc::SignalingClient;
23// Use extension traits from actr-protocol
24use actr_protocol::{ActrIdExt, ActrTypeExt};
25// Use heartbeat functions
26use crate::lifecycle::heartbeat::heartbeat_task;
27
28// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
29// Constants
30// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
31
32/// ActrNode - ActrSystem + Workload (1:1 composition)
33///
34/// # Generic Parameters
35/// - `W`: Workload type
36///
37/// # MessageDispatcher Association
38/// - Statically associated via W::Dispatcher
39/// - Does not store Dispatcher instance (not even ZST needed)
40/// - Dispatch calls entirely through type system
41pub struct ActrNode<W: Workload> {
42 /// Runtime configuration
43 pub(crate) config: actr_config::Config,
44
45 /// Workload instance (the only business logic)
46 pub(crate) workload: Arc<W>,
47
48 /// SQLite persistent mailbox
49 pub(crate) mailbox: Arc<dyn Mailbox>,
50
51 /// Dead Letter Queue for poison messages
52 pub(crate) dlq: Arc<dyn DeadLetterQueue>,
53
54 /// Context factory (created in start() after obtaining ActorId)
55 pub(crate) context_factory: Option<ContextFactory>,
56
57 /// Signaling client
58 pub(crate) signaling_client: Arc<dyn SignalingClient>,
59
60 /// Actor ID (obtained after startup)
61 pub(crate) actor_id: Option<ActrId>,
62
63 /// Actor Credential (obtained after startup, used for subsequent authentication messages)
64 pub(crate) credential_state: Option<CredentialState>,
65
66 /// Pre-shared key for TURN authentication (obtained from registration)
67 pub(crate) psk: Option<Bytes>,
68
69 /// WebRTC coordinator (created after startup)
70 pub(crate) webrtc_coordinator: Option<Arc<crate::wire::webrtc::coordinator::WebRtcCoordinator>>,
71
72 /// WebRTC Gate (created after startup)
73 pub(crate) webrtc_gate: Option<Arc<crate::wire::webrtc::gate::WebRtcGate>>,
74
75 /// Shell → Workload Transport Manager
76 ///
77 /// Workload receives REQUEST from Shell (zero serialization, direct RpcEnvelope passing)
78 pub(crate) inproc_mgr: Option<Arc<InprocTransportManager>>,
79
80 /// Workload → Shell Transport Manager
81 ///
82 /// Workload sends RESPONSE to Shell (separate pending_requests from Shell's)
83 pub(crate) workload_to_shell_mgr: Option<Arc<InprocTransportManager>>,
84
85 /// Shutdown token for graceful shutdown
86 pub(crate) shutdown_token: CancellationToken,
87}
88
89/// Credential state for shared access between tasks
90#[derive(Clone)]
91pub struct CredentialState {
92 inner: Arc<RwLock<CredentialStateInner>>,
93}
94
95#[derive(Clone)]
96struct CredentialStateInner {
97 credential: AIdCredential,
98 expires_at: Option<prost_types::Timestamp>,
99 /// This is updated together with credential when credential is refreshed
100 psk: Option<Bytes>,
101}
102
103impl CredentialState {
104 /// Create a new CredentialState with PSK
105 fn new(
106 credential: AIdCredential,
107 expires_at: Option<prost_types::Timestamp>,
108 psk: Option<Bytes>,
109 ) -> Self {
110 Self {
111 inner: Arc::new(RwLock::new(CredentialStateInner {
112 credential,
113 expires_at,
114 psk,
115 })),
116 }
117 }
118
119 pub async fn credential(&self) -> AIdCredential {
120 self.inner.read().await.credential.clone()
121 }
122
123 pub async fn expires_at(&self) -> Option<prost_types::Timestamp> {
124 self.inner.read().await.expires_at
125 }
126
127 /// Get the PSK for TURN authentication
128 pub async fn psk(&self) -> Option<Bytes> {
129 self.inner.read().await.psk.clone()
130 }
131
132 /// Update credential along with PSK
133 /// This should be called when credential is refreshed and a new PSK is provided
134 pub(crate) async fn update(
135 &self,
136 credential: AIdCredential,
137 expires_at: Option<prost_types::Timestamp>,
138 psk: Option<Bytes>,
139 ) {
140 let mut guard = self.inner.write().await;
141 guard.credential = credential;
142 guard.expires_at = expires_at;
143 if psk.is_some() {
144 guard.psk = psk;
145 }
146 }
147}
148
149/// Map ProtocolError to error code for ErrorResponse
150fn protocol_error_to_code(err: &actr_protocol::ProtocolError) -> u32 {
151 use actr_protocol::ProtocolError;
152 match err {
153 ProtocolError::Actr(_) => 400, // Bad Request - identity/decode error
154 ProtocolError::Uri(_) => 400, // Bad Request - URI parsing error
155 ProtocolError::Name(_) => 400, // Bad Request - invalid name
156 ProtocolError::SerializationError(_) => 500, // Internal Server Error
157 ProtocolError::DeserializationError(_) => 400, // Bad Request - invalid payload
158 ProtocolError::DecodeError(_) => 400, // Bad Request - decode failure
159 ProtocolError::EncodeError(_) => 500, // Internal Server Error
160 ProtocolError::UnknownRoute(_) => 404, // Not Found - route not found
161 ProtocolError::TransportError(_) => 503, // Service Unavailable
162 ProtocolError::Timeout => 504, // Gateway Timeout
163 ProtocolError::TargetNotFound(_) => 404, // Not Found
164 ProtocolError::TargetUnavailable(_) => 503, // Service Unavailable
165 ProtocolError::InvalidStateTransition(_) => 500, // Internal Server Error
166 }
167}
168
169/// Check ACL permission for incoming request
170///
171/// # Arguments
172/// - `caller_id`: The ActrId of the caller (None for local calls)
173/// - `target_id`: The ActrId of the target (self)
174/// - `acl`: ACL rules from configuration
175///
176/// # Returns
177/// - `Ok(true)`: Permission granted
178/// - `Ok(false)`: Permission denied
179/// - `Err`: ACL check failed (treat as deny)
180///
181/// # ACL Evaluation Logic
182/// 1. If no caller_id (local call), always allow
183/// 2. If no ACL configured, allow by default (permissive mode for backward compatibility)
184/// 3. If ACL configured but rules are empty, deny all (secure by default)
185/// 4. Iterate through ACL rules in order (first match wins)
186/// - Check if caller matches any principal in the rule
187/// - If matched, return the rule's permission (ALLOW/DENY)
188/// 5. If no rule matches, deny by default (secure by default)
189fn check_acl_permission(
190 caller_id: Option<&ActrId>,
191 target_id: &ActrId,
192 acl: Option<&actr_protocol::Acl>,
193) -> Result<bool, String> {
194 // 1. Local calls (no caller_id) are always allowed
195 if caller_id.is_none() {
196 tracing::trace!("ACL: Local call, allowing");
197 return Ok(true);
198 }
199
200 let caller = caller_id.unwrap();
201
202 // 2. No ACL configured - allow by default
203 let acl_rules = match acl {
204 Some(acl) => acl,
205 None => {
206 tracing::trace!(
207 "ACL: No ACL configured, allowing {} -> {}",
208 caller.to_string_repr(),
209 target_id.to_string_repr()
210 );
211 return Ok(true);
212 }
213 };
214
215 // 3. If ACL is configured but has no rules, deny all (secure by default)
216 if acl_rules.rules.is_empty() {
217 tracing::warn!(
218 "ACL: ACL configured but no rules defined, denying {} -> {} (default deny)",
219 caller.to_string_repr(),
220 target_id.to_string_repr()
221 );
222 return Ok(false);
223 }
224
225 // 4. Iterate through ACL rules (first match wins)
226 for (rule_idx, rule) in acl_rules.rules.iter().enumerate() {
227 // Check if caller matches any principal in this rule
228 let mut matched = false;
229
230 // If no principals specified, skip this rule (empty allow list = no match)
231 if rule.principals.is_empty() {
232 tracing::trace!(
233 "ACL: Rule {} has empty principals list, skipping (no match)",
234 rule_idx
235 );
236 continue;
237 }
238
239 // Check each principal
240 for principal in &rule.principals {
241 if matches_principal(caller, principal) {
242 matched = true;
243 tracing::trace!(
244 "ACL: Rule {} matched principal: caller={}, principal_realm={:?}, principal_type={:?}",
245 rule_idx,
246 caller.to_string_repr(),
247 principal.realm.as_ref().map(|r| r.realm_id),
248 principal.actr_type.as_ref().map(|t| &t.name)
249 );
250 break;
251 }
252 }
253
254 // If matched, return the permission
255 if matched {
256 let permission = rule.permission;
257 let is_allow = permission == actr_protocol::acl_rule::Permission::Allow as i32;
258
259 tracing::debug!(
260 "ACL: Rule {} matched, permission={} for {} -> {}",
261 rule_idx,
262 if is_allow { "ALLOW" } else { "DENY" },
263 caller.to_string_repr(),
264 target_id.to_string_repr()
265 );
266
267 return Ok(is_allow);
268 }
269 }
270
271 // 5. No rule matched - deny by default (secure by default)
272 tracing::warn!(
273 "ACL: No matching rule found, denying {} -> {} (default deny)",
274 caller.to_string_repr(),
275 target_id.to_string_repr()
276 );
277 Ok(false)
278}
279
280/// Check if a caller matches a principal
281///
282/// A principal matches if:
283/// - If principal.realm is specified, it must match caller.realm
284/// - If principal.actr_type is specified, it must match caller.type
285/// - If both are None, principal matches all (should not happen in practice)
286fn matches_principal(caller: &ActrId, principal: &actr_protocol::acl_rule::Principal) -> bool {
287 // Check realm match (if specified)
288 if let Some(ref principal_realm) = principal.realm
289 && caller.realm.realm_id != principal_realm.realm_id
290 {
291 return false;
292 }
293
294 // Check type match (if specified)
295 if let Some(ref principal_type) = principal.actr_type
296 && (caller.r#type.manufacturer != principal_type.manufacturer
297 || caller.r#type.name != principal_type.name)
298 {
299 return false;
300 }
301
302 // If we reach here, all specified fields matched
303 true
304}
305
306impl<W: Workload> ActrNode<W> {
307 /// Get Inproc Transport Manager
308 ///
309 /// # Returns
310 /// - `Some(Arc<InprocTransportManager>)`: Initialized manager
311 /// - `None`: Not yet started (need to call start() first)
312 ///
313 /// # Use Cases
314 /// - Workload internals need to communicate with Shell
315 /// - Create custom LatencyFirst/MediaTrack channels
316 pub fn inproc_mgr(&self) -> Option<Arc<InprocTransportManager>> {
317 self.inproc_mgr.clone()
318 }
319
320 /// Get ActorId (if registration has completed)
321 pub fn actor_id(&self) -> Option<&ActrId> {
322 self.actor_id.as_ref()
323 }
324
325 /// Get credential state (if registration has completed)
326 pub fn credential_state(&self) -> Option<CredentialState> {
327 self.credential_state.clone()
328 }
329
330 /// Get signaling client (for manual control such as UnregisterRequest)
331 pub fn signaling_client(&self) -> Arc<dyn SignalingClient> {
332 self.signaling_client.clone()
333 }
334
335 /// Get shutdown token for this node
336 pub fn shutdown_token(&self) -> CancellationToken {
337 self.shutdown_token.clone()
338 }
339
340 /// Discover remote actors of the specified type via signaling server.
341 ///
342 /// This requests best route candidates via `RouteCandidatesRequest` using the node's existing registration.
343 ///
344 /// The method returns the ordered list of candidate `ActrId`s reported by the signaling server.
345 ///
346 /// # Errors
347 /// - Returns `InvalidStateTransition` if the node is not started (no actor_id/credential).
348 /// The node must be started via `start()` before calling this method.
349 /// - Returns `TransportError` if the signaling client is not connected.
350 #[cfg_attr(feature = "opentelemetry", tracing::instrument(skip_all))]
351 pub async fn discover_route_candidates(
352 &self,
353 target_type: &ActrType,
354 candidate_count: u32,
355 ) -> ActorResult<Vec<ActrId>> {
356 // Check if node is started (has actor_id and credential)
357 let actor_id = self.actor_id.as_ref().ok_or_else(|| {
358 actr_protocol::ProtocolError::InvalidStateTransition(
359 "Node is not started. Call start() first.".to_string(),
360 )
361 })?;
362
363 // Check if the signaling client is connected
364 if !self.signaling_client.is_connected() {
365 return Err(actr_protocol::ProtocolError::TransportError(
366 "Signaling client is not connected.".to_string(),
367 ));
368 }
369
370 let client = self.signaling_client.as_ref();
371
372 let criteria = route_candidates_request::NodeSelectionCriteria {
373 candidate_count,
374 ranking_factors: Vec::new(),
375 minimal_dependency_requirement: None,
376 minimal_health_requirement: None,
377 };
378
379 let route_request = RouteCandidatesRequest {
380 target_type: target_type.clone(),
381 criteria: Some(criteria),
382 client_location: None,
383 };
384
385 let credential_state = self.credential_state.clone().ok_or_else(|| {
386 actr_protocol::ProtocolError::InvalidStateTransition(
387 "Node is not started. Call start() first.".to_string(),
388 )
389 })?;
390 let route_response = client
391 .send_route_candidates_request(
392 actor_id.clone(),
393 credential_state.credential().await,
394 route_request,
395 )
396 .await
397 .map_err(|e| {
398 actr_protocol::ProtocolError::TransportError(format!(
399 "Route candidates request failed: {e}"
400 ))
401 })?;
402
403 match route_response.result {
404 Some(actr_protocol::route_candidates_response::Result::Success(success)) => {
405 Ok(success.candidates)
406 }
407 Some(actr_protocol::route_candidates_response::Result::Error(err)) => {
408 Err(actr_protocol::ProtocolError::TransportError(format!(
409 "Route candidates error {}: {}",
410 err.code, err.message
411 )))
412 }
413 None => Err(actr_protocol::ProtocolError::TransportError(
414 "Invalid route candidates response: missing result".to_string(),
415 )),
416 }
417 }
418
419 /// Handle incoming message envelope
420 ///
421 /// # Performance Analysis
422 /// 1. create_context: ~10ns
423 /// 2. W::Dispatcher::dispatch: ~5-10ns (static match, can be inlined)
424 /// 3. User business logic: variable
425 ///
426 /// Framework overhead: ~15-20ns (compared to 50-100ns in traditional approaches)
427 ///
428 /// # Zero-cost Abstraction
429 /// - Compiler can inline entire call chain
430 /// - Match branches can be directly expanded
431 /// - Final generated code approaches hand-written match expression
432 ///
433 /// # Parameters
434 /// - `envelope`: The RPC envelope containing the message
435 /// - `caller_id`: The ActrId of the caller (from transport layer, None for local Shell calls)
436 ///
437 /// # caller_id Design
438 ///
439 /// **Why not in RpcEnvelope?**
440 /// - Transport layer (WebRTC/Mailbox) already knows the sender
441 /// - All connections are direct P2P (no intermediaries)
442 /// - Storing in envelope would be redundant duplication
443 ///
444 /// **How it works:**
445 /// - WebRTC/Mailbox stores sender in `MessageRecord.from` (Protobuf bytes)
446 /// - Only decoded when creating Context (once per message)
447 /// - Shell calls pass `None` (local process, no remote caller)
448 /// - Remote calls decode from `MessageRecord.from`
449 ///
450 /// **trace_id vs request_id:**
451 /// - `trace_id`: Distributed tracing across entire call chain (A → B → C)
452 /// - `request_id`: Unique identifier for each request-response pair
453 /// - Both kept for flexibility in complex scenarios
454 /// - Single-hop calls: effectively identical
455 /// - Multi-hop calls: trace_id spans all hops, request_id per hop
456 #[cfg_attr(
457 feature = "opentelemetry",
458 tracing::instrument(skip_all, name = "ActrNode.handle_incoming")
459 )]
460 pub async fn handle_incoming(
461 &self,
462 envelope: RpcEnvelope,
463 caller_id: Option<&ActrId>,
464 ) -> ActorResult<Bytes> {
465 use actr_framework::MessageDispatcher;
466
467 // Log received message
468 if let Some(caller) = caller_id {
469 tracing::debug!(
470 "📨 Handling incoming message: route_key={}, caller={}, request_id={}",
471 envelope.route_key,
472 caller.to_string_repr(),
473 envelope.request_id
474 );
475 } else {
476 tracing::debug!(
477 "📨 Handling incoming message: route_key={}, request_id={}",
478 envelope.route_key,
479 envelope.request_id
480 );
481 }
482
483 // 0. Get actor_id early for ACL check
484 let actor_id = self.actor_id.as_ref().ok_or_else(|| {
485 actr_protocol::ProtocolError::InvalidStateTransition(
486 "Actor ID not set - node must be started before handling messages".to_string(),
487 )
488 })?;
489
490 // 0.1. ACL Permission Check (before processing message)
491 let acl_allowed = check_acl_permission(caller_id, actor_id, self.config.acl.as_ref())
492 .map_err(|err_msg| {
493 actr_protocol::ProtocolError::TransportError(format!(
494 "ACL check failed: {}",
495 err_msg
496 ))
497 })?;
498
499 if !acl_allowed {
500 tracing::warn!(
501 severity = 5,
502 error_category = "acl_denied",
503 request_id = %envelope.request_id,
504 route_key = %envelope.route_key,
505 caller = %caller_id.map(|c| c.to_string_repr()).unwrap_or_else(|| "<none>".to_string()),
506 "🚫 ACL: Permission denied"
507 );
508
509 return Err(actr_protocol::ProtocolError::Actr(
510 actr_protocol::ActrError::PermissionDenied {
511 message: format!(
512 "ACL denied: {} is not allowed to call {}",
513 caller_id
514 .map(|c| c.to_string_repr())
515 .unwrap_or_else(|| "<unknown>".to_string()),
516 actor_id.to_string_repr()
517 ),
518 },
519 ));
520 }
521
522 // 1. Create Context with caller_id from transport layer
523 let credential_state = self.credential_state.clone().ok_or_else(|| {
524 actr_protocol::ProtocolError::InvalidStateTransition(
525 "Credential not set - node must be started before handling messages".to_string(),
526 )
527 })?;
528 let ctx = self
529 .context_factory
530 .as_ref()
531 .expect("ContextFactory must be initialized in start()")
532 .create(
533 actor_id,
534 caller_id, // caller_id from transport layer (MessageRecord.from)
535 &envelope.request_id,
536 &credential_state.credential().await,
537 );
538
539 // 2. Static MessageRouter dispatch (zero-cost abstraction)
540 // Compiler will inline entire call chain, generating code close to hand-written match
541 //
542 // Wrap dispatch in panic catching to prevent handler panics from crashing the runtime
543 let result = std::panic::AssertUnwindSafe(W::Dispatcher::dispatch(
544 &self.workload,
545 envelope.clone(),
546 &ctx,
547 ))
548 .catch_unwind()
549 .await;
550
551 let result = match result {
552 Ok(handler_result) => handler_result,
553 Err(panic_payload) => {
554 // Handler panicked - extract panic info
555 let panic_info = if let Some(s) = panic_payload.downcast_ref::<&str>() {
556 s.to_string()
557 } else if let Some(s) = panic_payload.downcast_ref::<String>() {
558 s.clone()
559 } else {
560 "Unknown panic payload".to_string()
561 };
562
563 tracing::error!(
564 severity = 8,
565 error_category = "handler_panic",
566 route_key = envelope.route_key,
567 request_id = %envelope.request_id,
568 "❌ Handler panicked: {}",
569 panic_info
570 );
571
572 // Return DecodeFailure error with panic info
573 // (using DecodeFailure as a proxy for "cannot process message")
574 Err(actr_protocol::ProtocolError::Actr(
575 actr_protocol::ActrError::DecodeFailure {
576 message: format!("Handler panicked: {panic_info}"),
577 },
578 ))
579 }
580 };
581
582 // 3. Log result
583 match &result {
584 Ok(_) => tracing::debug!(
585 request_id = %envelope.request_id,
586 route_key = %envelope.route_key,
587 "✅ Message handled successfully"
588 ),
589 Err(e) => tracing::error!(
590 severity = 6,
591 error_category = "handler_error",
592 request_id = %envelope.request_id,
593 route_key = %envelope.route_key,
594 "❌ Message handling failed: {:?}", e
595 ),
596 }
597
598 result
599 }
600
601 /// Start the system
602 ///
603 /// # Startup Sequence
604 /// 1. Connect to signaling server and register Actor
605 /// 2. Initialize transport layer (WebRTC)
606 /// 3. Call lifecycle hook on_start (if Lifecycle trait is implemented)
607 /// 4. Start Mailbox processing loop (State Path serial processing)
608 /// 5. Start Transport (begin receiving messages)
609 /// 6. Create ActrRef for Shell to interact with Workload
610 ///
611 /// # Returns
612 /// - `ActrRef<W>`: Lightweight reference for Shell to call Workload methods
613 pub async fn start(mut self) -> ActorResult<crate::actr_ref::ActrRef<W>> {
614 tracing::info!("🚀 Starting ActrNode");
615
616 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
617 // 1. Connect to signaling server and register
618 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
619 tracing::info!("📡 Connecting to signaling server");
620 self.signaling_client.connect().await.map_err(|e| {
621 actr_protocol::ProtocolError::TransportError(format!("Signaling connect failed: {e}"))
622 })?;
623 tracing::info!("✅ Connected to signaling server");
624
625 // Get ActrType from configuration
626 let actr_type = self.config.actr_type().clone();
627 tracing::info!("📋 Actor type: {}", actr_type.to_string_repr());
628
629 // Calculate ServiceSpec from config exports
630 let service_spec = self.config.calculate_service_spec();
631 if let Some(ref spec) = service_spec {
632 tracing::info!("📦 Service fingerprint: {}", spec.fingerprint);
633 tracing::info!("📦 Service tags: {:?}", spec.tags);
634 } else {
635 tracing::info!("📦 No proto exports, ServiceSpec is None");
636 }
637
638 // Construct protobuf RegisterRequest
639 let register_request = RegisterRequest {
640 actr_type: actr_type.clone(),
641 realm: self.config.realm,
642 service_spec,
643 acl: self.config.acl.clone(),
644 };
645
646 tracing::info!("📤 Registering actor with signaling server (protobuf)");
647
648 // Use send_register_request to send and wait for response
649 let register_response = self
650 .signaling_client
651 .send_register_request(register_request)
652 .await
653 .map_err(|e| {
654 actr_protocol::ProtocolError::TransportError(format!(
655 "Actor registration failed: {e}"
656 ))
657 })?;
658
659 // Handle RegisterResponse oneof result
660 //
661 // Collect background task handles (including unregister task) so they can be managed
662 // by ActrRefShared later.
663 let mut task_handles = Vec::new();
664
665 match register_response.result {
666 Some(register_response::Result::Success(register_ok)) => {
667 let actor_id = register_ok.actr_id;
668 let credential = register_ok.credential;
669
670 tracing::info!("✅ Registration successful");
671 tracing::info!(
672 "🆔 Assigned ActrId: {}",
673 actr_protocol::ActrIdExt::to_string_repr(&actor_id)
674 );
675 tracing::info!(
676 "🔐 Received credential (token_key_id: {})",
677 credential.token_key_id
678 );
679 tracing::info!(
680 "💓 Signaling heartbeat interval: {} seconds",
681 register_ok.signaling_heartbeat_interval_secs
682 );
683
684 // Log additional information (if available)
685 if register_ok.psk.is_some() {
686 tracing::debug!("🔑 Received PSK (bootstrap keying material)");
687 }
688
689 if let Some(expires_at) = ®ister_ok.credential_expires_at {
690 tracing::debug!("⏰ Credential expires at: {}s", expires_at.seconds);
691 }
692
693 // Store ActrId and Credential
694 self.actor_id = Some(actor_id.clone());
695 let credential_state = CredentialState::new(
696 credential,
697 register_ok.credential_expires_at,
698 register_ok.psk.clone(),
699 );
700 self.credential_state = Some(credential_state.clone());
701
702 // Pass identity to signaling client so reconnect URLs carry auth info.
703 self.signaling_client.set_actor_id(actor_id.clone()).await;
704 self.signaling_client
705 .set_credential_state(credential_state.clone())
706 .await;
707 // Store PSK and public_key for TURN authentication
708 self.psk = register_ok.psk.clone();
709
710 // Persist identity into ContextFactory for later Context creation
711 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
712 // 1.3. Store references to both inproc managers (already created in ActrSystem::new())
713 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
714 let shell_to_workload = self
715 .context_factory
716 .as_ref()
717 .expect("ContextFactory must exist")
718 .shell_to_workload();
719 let workload_to_shell = self
720 .context_factory
721 .as_ref()
722 .expect("ContextFactory must exist")
723 .workload_to_shell();
724 self.inproc_mgr = Some(shell_to_workload); // Workload receives from this
725 self.workload_to_shell_mgr = Some(workload_to_shell); // Workload sends to this
726
727 tracing::info!(
728 "✅ Inproc infrastructure already ready (created in ActrSystem::new())"
729 );
730
731 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
732 // 1.5. Create WebRTC infrastructure
733 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
734 tracing::info!("🌐 Initializing WebRTC infrastructure");
735
736 // Get MediaFrameRegistry from ContextFactory
737 let media_frame_registry = self
738 .context_factory
739 .as_ref()
740 .expect("ContextFactory must exist")
741 .media_frame_registry
742 .clone();
743
744 // Create WebRtcCoordinator
745 let coordinator =
746 Arc::new(crate::wire::webrtc::coordinator::WebRtcCoordinator::new(
747 actor_id.clone(),
748 credential_state.clone(),
749 self.signaling_client.clone(),
750 self.config.webrtc.clone(),
751 self.config.realm.realm_id.clone(),
752 media_frame_registry,
753 ));
754
755 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
756 // 1.6. Create OutprocTransportManager + OutprocOutGate (新架构)
757 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
758 tracing::info!("🏗️ Creating OutprocTransportManager with WebRTC support");
759
760 // Create DefaultWireBuilder with WebRTC coordinator
761 use crate::transport::{DefaultWireBuilder, DefaultWireBuilderConfig};
762 let wire_builder_config = DefaultWireBuilderConfig {
763 websocket_url_template: None, // WebSocket disabled for now
764 enable_webrtc: true,
765 enable_websocket: false,
766 };
767 let wire_builder = Arc::new(DefaultWireBuilder::new(
768 Some(coordinator.clone()),
769 wire_builder_config,
770 ));
771
772 // Create OutprocTransportManager
773 use crate::transport::OutprocTransportManager;
774 let transport_manager =
775 Arc::new(OutprocTransportManager::new(actor_id.clone(), wire_builder));
776
777 // Create OutprocOutGate with WebRTC coordinator for MediaTrack support
778 use crate::outbound::{OutGate, OutprocOutGate};
779 let outproc_gate = Arc::new(OutprocOutGate::new(
780 transport_manager,
781 Some(coordinator.clone()), // Enable MediaTrack support
782 ));
783 let outproc_gate_enum = OutGate::OutprocOut(outproc_gate.clone());
784
785 tracing::info!("✅ OutprocTransportManager + OutprocOutGate initialized");
786
787 // Get DataStreamRegistry from ContextFactory
788 let data_stream_registry = self
789 .context_factory
790 .as_ref()
791 .expect("ContextFactory must exist")
792 .data_stream_registry
793 .clone();
794
795 // Create WebRtcGate with shared pending_requests and DataStreamRegistry
796 let pending_requests = outproc_gate.get_pending_requests();
797 let gate = Arc::new(crate::wire::webrtc::gate::WebRtcGate::new(
798 coordinator.clone(),
799 pending_requests,
800 data_stream_registry,
801 ));
802
803 // Set local_id
804 gate.set_local_id(actor_id.clone()).await;
805
806 tracing::info!(
807 "✅ WebRtcGate created with shared pending_requests and DataStreamRegistry"
808 );
809
810 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
811 // 1.7. Set outproc_gate in ContextFactory (completing initialization)
812 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
813 tracing::info!("🔧 Setting outproc_gate in ContextFactory");
814 self.context_factory
815 .as_mut()
816 .expect("ContextFactory must exist")
817 .set_outproc_gate(outproc_gate_enum);
818
819 tracing::info!(
820 "✅ ContextFactory fully initialized (inproc + outproc gates ready)"
821 );
822
823 // Save references (WebRtcGate kept for backward compatibility if needed)
824 self.webrtc_coordinator = Some(coordinator.clone());
825 self.webrtc_gate = Some(gate.clone());
826
827 tracing::info!("✅ WebRTC infrastructure initialized");
828
829 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
830 // 1.7.5. Create shared state for credential management
831 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
832 // Shared credential state initialized above; reused across tasks
833
834 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
835 // 1.8. Spawn heartbeat task (periodic Ping to signaling server)
836 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
837 {
838 let shutdown = self.shutdown_token.clone();
839 let client = self.signaling_client.clone();
840 let actor_id_for_heartbeat = actor_id.clone();
841 let credential_state_for_heartbeat = credential_state.clone();
842 let mailbox_for_heartbeat = self.mailbox.clone();
843
844 // Use interval from registration response, default to 30s
845 let heartbeat_interval_secs = register_ok.signaling_heartbeat_interval_secs;
846 let heartbeat_interval = if heartbeat_interval_secs > 0 {
847 Duration::from_secs(heartbeat_interval_secs as u64)
848 } else {
849 Duration::from_secs(30)
850 };
851
852 let heartbeat_handle = tokio::spawn(heartbeat_task(
853 shutdown,
854 client,
855 actor_id_for_heartbeat,
856 credential_state_for_heartbeat,
857 mailbox_for_heartbeat,
858 heartbeat_interval,
859 ));
860
861 task_handles.push(heartbeat_handle);
862 }
863 tracing::info!(
864 "✅ Heartbeat task started (interval: {}s)",
865 register_ok.signaling_heartbeat_interval_secs
866 );
867
868 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
869 // 1.9. Spawn dedicated Unregister task (best-effort, with timeout)
870 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
871 //
872 // This task:
873 // - Waits for shutdown_token to be cancelled (e.g., wait_for_ctrl_c_and_shutdown)
874 // - Then sends UnregisterRequest via signaling client with a timeout
875 //
876 // NOTE: we push its JoinHandle into task_handles so it can be aborted
877 // by ActrRefShared::Drop if needed.
878 {
879 let shutdown = self.shutdown_token.clone();
880 let client = self.signaling_client.clone();
881 let actor_id_for_unreg = actor_id.clone();
882 let credential_state_for_unreg = credential_state.clone();
883 let webrtc_coordinator = self.webrtc_coordinator.clone();
884
885 let unregister_handle = tokio::spawn(async move {
886 // Wait for shutdown signal
887 shutdown.cancelled().await;
888 tracing::info!(
889 "📡 Shutdown signal received2, sending UnregisterRequest for Actor {:?}",
890 actor_id_for_unreg
891 );
892
893 // 1. 先关闭所有 WebRTC peer 连接(如果存在)
894 if let Some(coord) = webrtc_coordinator {
895 if let Err(e) = coord.close_all_peers().await {
896 tracing::warn!(
897 "⚠️ Failed to close all WebRTC peers before UnregisterRequest: {}",
898 e
899 );
900 } else {
901 tracing::info!(
902 "✅ All WebRTC peers closed before UnregisterRequest"
903 );
904 }
905 } else {
906 tracing::debug!(
907 "WebRTC coordinator not found before UnregisterRequest (no WebRTC?)"
908 );
909 }
910
911 // 2. 再发送 UnregisterRequest,设置一个超时(例如 5 秒)
912 let result = tokio::time::timeout(
913 std::time::Duration::from_secs(5),
914 client.send_unregister_request(
915 actor_id_for_unreg.clone(),
916 credential_state_for_unreg.credential().await,
917 Some("Graceful shutdown".to_string()),
918 ),
919 )
920 .await;
921 tracing::info!("UnregisterRequest result: {:?}", result);
922 match result {
923 Ok(Ok(_)) => {
924 tracing::info!(
925 "✅ UnregisterRequest sent to signaling server for Actor {:?}",
926 actor_id_for_unreg
927 );
928 }
929 Ok(Err(e)) => {
930 tracing::warn!(
931 "⚠️ Failed to send UnregisterRequest for Actor {:?}: {}",
932 actor_id_for_unreg,
933 e
934 );
935 }
936 Err(_) => {
937 tracing::warn!(
938 "⚠️ UnregisterRequest timeout (5s) for Actor {:?}",
939 actor_id_for_unreg
940 );
941 }
942 }
943 });
944
945 task_handles.push(unregister_handle);
946 }
947
948 // Spawn signaling auto-reconnect helper that reacts immediately to disconnect events.
949 crate::wire::webrtc::spawn_signaling_reconnector(
950 self.signaling_client.clone(),
951 self.shutdown_token.clone(),
952 );
953 }
954 Some(register_response::Result::Error(error)) => {
955 tracing::error!(
956 severity = 10,
957 error_category = "registration_error",
958 error_code = error.code,
959 "❌ Registration failed: code={}, message={}",
960 error.code,
961 error.message
962 );
963 return Err(actr_protocol::ProtocolError::TransportError(format!(
964 "Registration rejected: {} (code: {})",
965 error.message, error.code
966 )));
967 }
968 None => {
969 tracing::error!(
970 severity = 10,
971 error_category = "registration_error",
972 "❌ Registration response missing result"
973 );
974 return Err(actr_protocol::ProtocolError::TransportError(
975 "Invalid registration response: missing result".to_string(),
976 ));
977 }
978 }
979
980 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
981 // 2. Transport layer initialization (completed via WebRTC infrastructure)
982 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
983 tracing::info!("✅ Transport layer initialized via WebRTC infrastructure");
984
985 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
986 // 3.1 Convert to Arc (before starting background loops)
987 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
988 // Clone actor_id before moving self into Arc
989 let actor_id = self
990 .actor_id
991 .as_ref()
992 .ok_or_else(|| {
993 actr_protocol::ProtocolError::InvalidStateTransition(
994 "Actor ID not set - registration must complete before starting node"
995 .to_string(),
996 )
997 })?
998 .clone();
999 let credential_state = self.credential_state.clone().ok_or_else(|| {
1000 actr_protocol::ProtocolError::InvalidStateTransition(
1001 "Credential not set - node must be started before handling messages".to_string(),
1002 )
1003 })?;
1004
1005 let actor_id_for_shell = actor_id.clone();
1006 let shutdown_token = self.shutdown_token.clone();
1007 let node_ref = Arc::new(self);
1008
1009 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1010 // 3.5. Start WebRTC background loops (BEFORE on_start)
1011 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1012 // CRITICAL: Start signaling loop before on_start() to avoid deadlock
1013 // where on_start() tries to send messages but signaling loop isn't running
1014 tracing::info!("🚀 Starting WebRTC background loops");
1015
1016 // Start WebRtcCoordinator signaling loop
1017 if let Some(coordinator) = &node_ref.webrtc_coordinator {
1018 coordinator.clone().start().await.map_err(|e| {
1019 actr_protocol::ProtocolError::TransportError(format!(
1020 "WebRtcCoordinator start failed: {e}"
1021 ))
1022 })?;
1023 tracing::info!("✅ WebRtcCoordinator signaling loop started");
1024 }
1025
1026 // Start WebRtcGate message receive loop (route to Mailbox)
1027 if let Some(gate) = &node_ref.webrtc_gate {
1028 gate.start_receive_loop(node_ref.mailbox.clone())
1029 .await
1030 .map_err(|e| {
1031 actr_protocol::ProtocolError::TransportError(format!(
1032 "WebRtcGate receive loop start failed: {e}"
1033 ))
1034 })?;
1035 tracing::info!("✅ WebRtcGate → Mailbox routing started");
1036 }
1037
1038 tracing::info!("✅ WebRTC background loops started");
1039
1040 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1041 // 4. Call lifecycle hook on_start (AFTER WebRTC loops are running)
1042 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1043 tracing::info!("🪝 Calling lifecycle hook: on_start");
1044
1045 let ctx = node_ref
1046 .context_factory
1047 .as_ref()
1048 .expect("ContextFactory must be initialized before on_start")
1049 .create(
1050 &actor_id,
1051 None, // caller_id
1052 "bootstrap", // request_id
1053 &credential_state.credential().await,
1054 );
1055 node_ref.workload.on_start(&ctx).await?;
1056 tracing::info!("✅ Lifecycle hook on_start completed");
1057
1058 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1059 // 4.6. Start Inproc receive loop (Shell → Workload)
1060 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1061 tracing::info!("🔄 Starting Inproc receive loop (Shell → Workload)");
1062 // Start Workload receive loop (Shell → Workload REQUEST)
1063 if let Some(shell_to_workload) = &node_ref.inproc_mgr {
1064 if let Some(workload_to_shell) = &node_ref.workload_to_shell_mgr {
1065 let node = node_ref.clone();
1066 let request_rx_lane = shell_to_workload
1067 .get_lane(actr_protocol::PayloadType::RpcReliable, None)
1068 .await
1069 .map_err(|e| {
1070 actr_protocol::ProtocolError::TransportError(format!(
1071 "Failed to get Workload receive lane: {e}"
1072 ))
1073 })?;
1074 let response_tx = workload_to_shell.clone();
1075 let shutdown = shutdown_token.clone();
1076
1077 let inproc_handle = tokio::spawn(async move {
1078 loop {
1079 tokio::select! {
1080 _ = shutdown.cancelled() => {
1081 tracing::info!("📭 Workload receive loop (Shell → Workload) received shutdown signal");
1082 break;
1083 }
1084
1085 envelope_result = request_rx_lane.recv_envelope() => {
1086 match envelope_result {
1087 Ok(envelope) => {
1088 let request_id = envelope.request_id.clone();
1089 // Extract and set tracing context from envelope
1090 #[cfg(feature = "opentelemetry")]
1091 let span = {
1092 let span = tracing::info_span!("actrNode.lane.receive_rpc", request_id = %request_id);
1093 set_parent_from_rpc_envelope(&span, &envelope);
1094 span
1095 };
1096
1097 tracing::debug!("📨 Workload received REQUEST from Shell: request_id={}", request_id);
1098
1099 // Shell calls have no caller_id (local process communication)
1100 let handle_incoming_fut = node.handle_incoming(envelope.clone(), None);
1101 #[cfg(feature = "opentelemetry")]
1102 let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
1103 match handle_incoming_fut.await {
1104 Ok(response_bytes) => {
1105 // Send RESPONSE back via workload_to_shell
1106 // Keep same route_key (no prefix needed - separate channels!)
1107 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1108 let mut response_envelope = RpcEnvelope {
1109 route_key: envelope.route_key.clone(),
1110 payload: Some(response_bytes),
1111 error: None,
1112 traceparent: None,
1113 tracestate: None,
1114 request_id: request_id.clone(),
1115 metadata: Vec::new(),
1116 timeout_ms: 30000,
1117 };
1118 // Inject tracing context
1119 #[cfg(feature = "opentelemetry")]
1120 inject_span_context_to_rpc(&span, &mut response_envelope);
1121
1122 // Send via Workload → Shell channel
1123 let send_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, response_envelope);
1124 #[cfg(feature = "opentelemetry")]
1125 let send_response_fut = send_response_fut.instrument(span.clone());
1126 if let Err(e) = send_response_fut.await {
1127 tracing::error!(
1128 severity = 7,
1129 error_category = "transport_error",
1130 request_id = %request_id,
1131 "❌ Failed to send RESPONSE to Shell: {:?}", e
1132 );
1133 }
1134 }
1135 Err(e) => {
1136 tracing::error!(
1137 severity = 6,
1138 error_category = "handler_error",
1139 request_id = %request_id,
1140 route_key = %envelope.route_key,
1141 "❌ Workload message handling failed: {:?}", e
1142 );
1143
1144 // Send error response (system-level error on envelope)
1145 let error_response = actr_protocol::ErrorResponse {
1146 code: protocol_error_to_code(&e),
1147 message: e.to_string(),
1148 };
1149
1150 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1151 let mut error_envelope = RpcEnvelope {
1152 route_key: envelope.route_key.clone(),
1153 payload: None,
1154 error: Some(error_response),
1155 traceparent: envelope.traceparent.clone(),
1156 tracestate: envelope.tracestate.clone(),
1157 request_id: request_id.clone(),
1158 metadata: Vec::new(),
1159 timeout_ms: 30000,
1160 };
1161 // Inject tracing context
1162 #[cfg(feature = "opentelemetry")]
1163 inject_span_context_to_rpc(&span, &mut error_envelope);
1164
1165 let send_error_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, error_envelope);
1166 #[cfg(feature = "opentelemetry")]
1167 let send_error_response_fut = send_error_response_fut.instrument(span);
1168 if let Err(e) = send_error_response_fut.await {
1169 tracing::error!(
1170 severity = 7,
1171 error_category = "transport_error",
1172 request_id = %request_id,
1173 "❌ Failed to send ERROR response to Shell: {:?}", e
1174 );
1175 }
1176 }
1177 }
1178 }
1179 Err(e) => {
1180 tracing::error!(
1181 severity = 8,
1182 error_category = "transport_error",
1183 "❌ Failed to receive from Shell → Workload lane: {:?}", e
1184 );
1185 break;
1186 }
1187 }
1188 }
1189 }
1190 }
1191 tracing::info!(
1192 "✅ Workload receive loop (Shell → Workload) terminated gracefully"
1193 );
1194 });
1195
1196 task_handles.push(inproc_handle);
1197 }
1198 }
1199 tracing::info!("✅ Workload receive loop (Shell → Workload REQUEST) started");
1200
1201 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1202 // 4.7. Start Shell receive loop (Workload → Shell RESPONSE)
1203 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1204 tracing::info!("🔄 Starting Shell receive loop (Workload → Shell RESPONSE)");
1205 if let Some(workload_to_shell) = &node_ref.workload_to_shell_mgr {
1206 if let Some(shell_to_workload) = &node_ref.inproc_mgr {
1207 let response_rx_lane = workload_to_shell
1208 .get_lane(actr_protocol::PayloadType::RpcReliable, None)
1209 .await
1210 .map_err(|e| {
1211 actr_protocol::ProtocolError::TransportError(format!(
1212 "Failed to get Shell receive lane: {e}"
1213 ))
1214 })?;
1215 let request_mgr = shell_to_workload.clone();
1216 let shutdown = shutdown_token.clone();
1217
1218 let shell_receive_handle = tokio::spawn(async move {
1219 loop {
1220 tokio::select! {
1221 _ = shutdown.cancelled() => {
1222 tracing::info!("📭 Shell receive loop (Workload → Shell) received shutdown signal");
1223 break;
1224 }
1225
1226 envelope_result = response_rx_lane.recv_envelope() => {
1227 match envelope_result {
1228 Ok(envelope) => {
1229 tracing::debug!("📨 Shell received RESPONSE from Workload: request_id={}", envelope.request_id);
1230
1231 // Check if response is success or error
1232 match (envelope.payload, envelope.error) {
1233 (Some(payload), None) => {
1234 // Success response
1235 if let Err(e) = request_mgr.complete_response(&envelope.request_id, payload).await {
1236 tracing::warn!(
1237 severity = 4,
1238 error_category = "orphan_response",
1239 request_id = %envelope.request_id,
1240 "⚠️ No pending request found for response: {:?}", e
1241 );
1242 }
1243 }
1244 (None, Some(error)) => {
1245 // Error response - convert to ProtocolError and complete with error
1246 let protocol_err = actr_protocol::ProtocolError::TransportError(
1247 format!("RPC error {}: {}", error.code, error.message)
1248 );
1249 if let Err(e) = request_mgr.complete_error(&envelope.request_id, protocol_err).await {
1250 tracing::warn!(
1251 severity = 4,
1252 error_category = "orphan_response",
1253 request_id = %envelope.request_id,
1254 "⚠️ No pending request found for error response: {:?}", e
1255 );
1256 }
1257 }
1258 _ => {
1259 tracing::error!(
1260 severity = 7,
1261 error_category = "protocol_error",
1262 request_id = %envelope.request_id,
1263 "❌ Invalid RpcEnvelope: both payload and error are present or both absent"
1264 );
1265 }
1266 }
1267 }
1268 Err(e) => {
1269 tracing::error!(
1270 severity = 8,
1271 error_category = "transport_error",
1272 "❌ Failed to receive from Workload → Shell lane: {:?}", e
1273 );
1274 break;
1275 }
1276 }
1277 }
1278 }
1279 }
1280 tracing::info!(
1281 "✅ Shell receive loop (Workload → Shell) terminated gracefully"
1282 );
1283 });
1284
1285 task_handles.push(shell_receive_handle);
1286 }
1287 }
1288 tracing::info!("✅ Shell receive loop (Workload → Shell RESPONSE) started");
1289
1290 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1291 // 5. Start Mailbox processing loop (State Path)
1292 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1293 tracing::info!("🔄 Starting Mailbox processing loop (State Path)");
1294 {
1295 let node = node_ref.clone();
1296 let mailbox = node_ref.mailbox.clone();
1297 let gate = node_ref.webrtc_gate.clone();
1298 let shutdown = shutdown_token.clone();
1299
1300 let mailbox_handle = tokio::spawn(async move {
1301 loop {
1302 tokio::select! {
1303 // Listen for shutdown signal
1304 _ = shutdown.cancelled() => {
1305 tracing::info!("📭 Mailbox loop received shutdown signal");
1306 break;
1307 }
1308
1309 // Dequeue messages (by priority)
1310 result = mailbox.dequeue() => {
1311 match result {
1312 Ok(messages) => {
1313 if messages.is_empty() {
1314 // Queue empty, sleep briefly
1315 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1316 continue;
1317 }
1318
1319 tracing::debug!("📬 Mailbox dequeue: {} messages", messages.len());
1320
1321 // Process messages one by one
1322 for msg_record in messages {
1323 // Deserialize RpcEnvelope (Protobuf)
1324 match RpcEnvelope::decode(&msg_record.payload[..]) {
1325 Ok(envelope) => {
1326 let request_id = envelope.request_id.clone();
1327 #[cfg(feature = "opentelemetry")]
1328 let span = {
1329 let span = tracing::info_span!("actrNode.mailbox.receive_rpc", request_id = %request_id);
1330 set_parent_from_rpc_envelope(&span, &envelope);
1331 span
1332 };
1333 tracing::debug!("📦 Processing message: request_id={}", request_id);
1334
1335 // Decode caller_id from MessageRecord.from (transport layer)
1336 let caller_id_result = ActrId::decode(&msg_record.from[..]);
1337 let caller_id_ref = caller_id_result.as_ref().ok();
1338
1339 if caller_id_ref.is_none() {
1340 tracing::warn!(
1341 request_id = %request_id,
1342 "⚠️ Failed to decode caller_id from MessageRecord.from"
1343 );
1344 }
1345
1346 // Call handle_incoming with caller_id from transport layer
1347 let handle_incoming_fut = node.handle_incoming(envelope.clone(), caller_id_ref);
1348 #[cfg(feature = "opentelemetry")]
1349 let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
1350 match handle_incoming_fut.await {
1351 Ok(response_bytes) => {
1352 // Send response (reuse request_id)
1353 if let Some(ref gate) = gate {
1354 // Use already decoded caller_id
1355 match caller_id_result {
1356 Ok(caller) => {
1357 // Construct response RpcEnvelope (reuse request_id!)
1358 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1359 let mut response_envelope = RpcEnvelope {
1360 request_id, // Reuse!
1361 route_key: envelope.route_key.clone(),
1362 payload: Some(response_bytes),
1363 error: None,
1364 traceparent: envelope.traceparent.clone(),
1365 tracestate: envelope.tracestate.clone(),
1366 metadata: Vec::new(), // Response doesn't need extra metadata
1367 timeout_ms: 30000,
1368 };
1369 // Inject tracing context
1370 #[cfg(feature = "opentelemetry")]
1371 inject_span_context_to_rpc(&span, &mut response_envelope);
1372
1373 let send_response_fut = gate.send_response(&caller, response_envelope);
1374 #[cfg(feature = "opentelemetry")]
1375 let send_response_fut = send_response_fut.instrument(span);
1376 if let Err(e) = send_response_fut.await {
1377 tracing::error!(
1378 severity = 7,
1379 error_category = "transport_error",
1380 request_id = %envelope.request_id,
1381 "❌ Failed to send response: {:?}", e
1382 );
1383 }
1384 }
1385 Err(e) => {
1386 tracing::error!(
1387 severity = 8,
1388 error_category = "protobuf_decode",
1389 request_id = %envelope.request_id,
1390 "❌ Failed to decode caller_id: {:?}", e
1391 );
1392 }
1393 }
1394 }
1395
1396 // ACK message
1397 if let Err(e) = mailbox.ack(msg_record.id).await {
1398 tracing::error!(
1399 severity = 9,
1400 error_category = "mailbox_error",
1401 request_id = %envelope.request_id,
1402 message_id = %msg_record.id,
1403 "❌ Mailbox ACK failed: {:?}", e
1404 );
1405 }
1406 }
1407 Err(e) => {
1408 tracing::error!(
1409 severity = 6,
1410 error_category = "handler_error",
1411 request_id = %envelope.request_id,
1412 route_key = %envelope.route_key,
1413 "❌ handle_incoming failed: {:?}", e
1414 );
1415 // ACK to avoid infinite retries
1416 // Application errors are caller's responsibility
1417 let _ = mailbox.ack(msg_record.id).await;
1418 }
1419 }
1420 }
1421 Err(e) => {
1422 // Poison message - cannot decode RpcEnvelope
1423 tracing::error!(
1424 severity = 9,
1425 error_category = "protobuf_decode",
1426 message_id = %msg_record.id,
1427 "❌ Poison message: Failed to deserialize RpcEnvelope: {:?}", e
1428 );
1429
1430 // Write to Dead Letter Queue
1431 use actr_mailbox::DlqRecord;
1432 use chrono::Utc;
1433 use uuid::Uuid;
1434
1435 let dlq_record = DlqRecord {
1436 id: Uuid::new_v4(),
1437 original_message_id: Some(msg_record.id.to_string()),
1438 from: Some(msg_record.from.clone()),
1439 to: node.actor_id.as_ref().map(|id| {
1440 let mut buf = Vec::new();
1441 id.encode(&mut buf).unwrap();
1442 buf
1443 }),
1444 raw_bytes: msg_record.payload.clone(),
1445 error_message: format!("Protobuf decode failed: {e}"),
1446 error_category: "protobuf_decode".to_string(),
1447 trace_id: format!("mailbox-{}", msg_record.id), // Fallback trace_id
1448 request_id: None,
1449 created_at: Utc::now(),
1450 redrive_attempts: 0,
1451 last_redrive_at: None,
1452 context: Some(format!(
1453 r#"{{"source":"mailbox","priority":"{}"}}"#,
1454 match msg_record.priority {
1455 actr_mailbox::MessagePriority::High => "high",
1456 actr_mailbox::MessagePriority::Normal => "normal",
1457 }
1458 )),
1459 };
1460
1461 if let Err(dlq_err) = node.dlq.enqueue(dlq_record).await {
1462 tracing::error!(
1463 severity = 10,
1464 "❌ CRITICAL: Failed to write poison message to DLQ: {:?}", dlq_err
1465 );
1466 } else {
1467 tracing::warn!(
1468 severity = 9,
1469 "☠️ Poison message moved to DLQ: message_id={}", msg_record.id
1470 );
1471 }
1472
1473 // ACK the poison message to remove from mailbox
1474 let _ = mailbox.ack(msg_record.id).await;
1475 }
1476 }
1477 }
1478 }
1479 Err(e) => {
1480 tracing::error!(
1481 severity = 9,
1482 error_category = "mailbox_error",
1483 "❌ Mailbox dequeue failed: {:?}", e
1484 );
1485 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1486 }
1487 }
1488 }
1489 }
1490 }
1491 tracing::info!("✅ Mailbox processing loop terminated gracefully");
1492 });
1493
1494 task_handles.push(mailbox_handle);
1495 }
1496 tracing::info!("✅ Mailbox processing loop started");
1497
1498 tracing::info!("✅ ActrNode started successfully");
1499
1500 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1501 // 6. Create ActrRef for Shell to interact with Workload
1502 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1503 use crate::actr_ref::{ActrRef, ActrRefShared};
1504 use crate::outbound::InprocOutGate;
1505
1506 // Create InprocOutGate from shell_to_workload transport manager
1507 let shell_to_workload = node_ref
1508 .inproc_mgr
1509 .clone()
1510 .expect("inproc_mgr must be initialized");
1511 let inproc_gate = Arc::new(InprocOutGate::new(shell_to_workload));
1512
1513 // Create ActrRefShared
1514 let actr_ref_shared = Arc::new(ActrRefShared {
1515 actor_id: actor_id_for_shell.clone(),
1516 inproc_gate,
1517 shutdown_token: shutdown_token.clone(),
1518 task_handles: tokio::sync::Mutex::new(task_handles),
1519 });
1520
1521 // Create ActrRef
1522 let actr_ref = ActrRef::new(actr_ref_shared, node_ref);
1523
1524 tracing::info!("✅ ActrRef created (Shell → Workload communication handle)");
1525
1526 Ok(actr_ref)
1527 }
1528}
1529
1530#[cfg(test)]
1531mod tests {
1532 use super::*;
1533 use actr_protocol::AIdCredential;
1534 use prost_types::Timestamp;
1535
1536 fn create_test_credential(token_key_id: u32) -> AIdCredential {
1537 AIdCredential {
1538 encrypted_token: vec![1, 2, 3, 4].into(),
1539 token_key_id,
1540 }
1541 }
1542
1543 fn create_test_timestamp(seconds: i64) -> Timestamp {
1544 Timestamp { seconds, nanos: 0 }
1545 }
1546
1547 #[tokio::test]
1548 async fn test_credential_state_initialization() {
1549 let credential = create_test_credential(1);
1550 let expires_at = Some(create_test_timestamp(1000));
1551
1552 let state = CredentialState::new(credential.clone(), expires_at, None);
1553
1554 let retrieved_credential = state.credential().await;
1555 assert_eq!(retrieved_credential.token_key_id, 1);
1556 assert_eq!(retrieved_credential.encrypted_token.as_ref(), &[1, 2, 3, 4]);
1557
1558 let retrieved_expires_at = state.expires_at().await;
1559 assert_eq!(retrieved_expires_at, expires_at);
1560 }
1561
1562 #[tokio::test]
1563 async fn test_credential_state_without_expiration() {
1564 let credential = create_test_credential(2);
1565 let state = CredentialState::new(credential.clone(), None, None);
1566
1567 let retrieved_credential = state.credential().await;
1568 assert_eq!(retrieved_credential.token_key_id, 2);
1569
1570 let retrieved_expires_at = state.expires_at().await;
1571 assert!(retrieved_expires_at.is_none());
1572 }
1573
1574 #[tokio::test]
1575 async fn test_credential_state_update() {
1576 let credential1 = create_test_credential(1);
1577 let expires_at1 = Some(create_test_timestamp(1000));
1578 let state = CredentialState::new(credential1, expires_at1, None);
1579
1580 // Verify initial state
1581 let initial_credential = state.credential().await;
1582 assert_eq!(initial_credential.token_key_id, 1);
1583
1584 // Update credential
1585 let credential2 = create_test_credential(2);
1586 let expires_at2 = Some(create_test_timestamp(2000));
1587 state.update(credential2.clone(), expires_at2, None).await;
1588
1589 // Verify updated state
1590 let updated_credential = state.credential().await;
1591 assert_eq!(updated_credential.token_key_id, 2);
1592 assert_eq!(
1593 updated_credential.encrypted_token,
1594 credential2.encrypted_token
1595 );
1596
1597 let updated_expires_at = state.expires_at().await;
1598 assert_eq!(updated_expires_at, Some(create_test_timestamp(2000)));
1599 }
1600
1601 #[tokio::test]
1602 async fn test_credential_state_concurrent_access() {
1603 let credential = create_test_credential(1);
1604 let expires_at = Some(create_test_timestamp(1000));
1605 let state = CredentialState::new(credential, expires_at, None);
1606
1607 // Spawn multiple tasks that concurrently access the credential state
1608 let mut handles = vec![];
1609 for i in 0..10 {
1610 let state_clone = state.clone();
1611 let handle = tokio::spawn(async move {
1612 let cred = state_clone.credential().await;
1613 assert_eq!(cred.token_key_id, 1);
1614 i
1615 });
1616 handles.push(handle);
1617 }
1618
1619 // Wait for all tasks to complete
1620 for handle in handles {
1621 let result = handle.await.unwrap();
1622 assert!(result < 10);
1623 }
1624 }
1625
1626 #[tokio::test]
1627 async fn test_credential_state_update_concurrent() {
1628 let credential1 = create_test_credential(1);
1629 let state = CredentialState::new(credential1, None, None);
1630
1631 // Spawn multiple update tasks
1632 let mut handles = vec![];
1633 for i in 2..12 {
1634 let state_clone = state.clone();
1635 let credential = create_test_credential(i);
1636 let handle = tokio::spawn(async move {
1637 state_clone.update(credential, None, None).await;
1638 });
1639 handles.push(handle);
1640 }
1641
1642 // Wait for all updates to complete
1643 for handle in handles {
1644 handle.await.unwrap();
1645 }
1646
1647 // Verify final state (should be the last update)
1648 let final_credential = state.credential().await;
1649 // The exact value depends on which update finished last, but it should be valid
1650 assert!(final_credential.token_key_id >= 2 && final_credential.token_key_id <= 11);
1651 }
1652}