actr_runtime/lifecycle/actr_node.rs
1//! ActrNode - ActrSystem + Workload (1:1 composition)
2
3use actr_framework::{Bytes, Workload};
4use actr_protocol::prost::Message as ProstMessage;
5use actr_protocol::{
6 ActorResult, ActrId, ActrType, PayloadType, RouteCandidatesRequest, RpcEnvelope,
7 route_candidates_request,
8};
9use futures_util::FutureExt;
10use std::sync::Arc;
11use tokio_util::sync::CancellationToken;
12use tracing::Instrument as _;
13
14use crate::context_factory::ContextFactory;
15use crate::transport::InprocTransportManager;
16
17// Use types from sub-crates
18use crate::wire::webrtc::{SignalingClient, WebRtcConfig};
19use actr_mailbox::{DeadLetterQueue, Mailbox};
20use actr_protocol::{AIdCredential, RegisterRequest, register_response};
21
22// Use extension traits from actr-protocol
23use actr_protocol::{ActrIdExt, ActrTypeExt};
24
25/// ActrNode - ActrSystem + Workload (1:1 composition)
26///
27/// # Generic Parameters
28/// - `W`: Workload type
29///
30/// # MessageDispatcher Association
31/// - Statically associated via W::Dispatcher
32/// - Does not store Dispatcher instance (not even ZST needed)
33/// - Dispatch calls entirely through type system
34pub struct ActrNode<W: Workload> {
35 /// Runtime configuration
36 pub(crate) config: actr_config::Config,
37
38 /// Workload instance (the only business logic)
39 pub(crate) workload: Arc<W>,
40
41 /// SQLite persistent mailbox
42 pub(crate) mailbox: Arc<dyn Mailbox>,
43
44 /// Dead Letter Queue for poison messages
45 pub(crate) dlq: Arc<dyn DeadLetterQueue>,
46
47 /// Context factory (created in start() after obtaining ActorId)
48 pub(crate) context_factory: Option<ContextFactory>,
49
50 /// Signaling client
51 pub(crate) signaling_client: Arc<dyn SignalingClient>,
52
53 /// Actor ID (obtained after startup)
54 pub(crate) actor_id: Option<ActrId>,
55
56 /// Actor Credential (obtained after startup, used for subsequent authentication messages)
57 pub(crate) credential: Option<AIdCredential>,
58
59 /// WebRTC coordinator (created after startup)
60 pub(crate) webrtc_coordinator: Option<Arc<crate::wire::webrtc::coordinator::WebRtcCoordinator>>,
61
62 /// WebRTC Gate (created after startup)
63 pub(crate) webrtc_gate: Option<Arc<crate::wire::webrtc::gate::WebRtcGate>>,
64
65 /// Shell → Workload Transport Manager
66 ///
67 /// Workload receives REQUEST from Shell (zero serialization, direct RpcEnvelope passing)
68 pub(crate) inproc_mgr: Option<Arc<InprocTransportManager>>,
69
70 /// Workload → Shell Transport Manager
71 ///
72 /// Workload sends RESPONSE to Shell (separate pending_requests from Shell's)
73 pub(crate) workload_to_shell_mgr: Option<Arc<InprocTransportManager>>,
74
75 /// Shutdown token for graceful shutdown
76 pub(crate) shutdown_token: CancellationToken,
77}
78
79/// Map ProtocolError to error code for ErrorResponse
80fn protocol_error_to_code(err: &actr_protocol::ProtocolError) -> u32 {
81 use actr_protocol::ProtocolError;
82 match err {
83 ProtocolError::Actr(_) => 400, // Bad Request - identity/decode error
84 ProtocolError::Uri(_) => 400, // Bad Request - URI parsing error
85 ProtocolError::Name(_) => 400, // Bad Request - invalid name
86 ProtocolError::SerializationError(_) => 500, // Internal Server Error
87 ProtocolError::DeserializationError(_) => 400, // Bad Request - invalid payload
88 ProtocolError::DecodeError(_) => 400, // Bad Request - decode failure
89 ProtocolError::EncodeError(_) => 500, // Internal Server Error
90 ProtocolError::UnknownRoute(_) => 404, // Not Found - route not found
91 ProtocolError::TransportError(_) => 503, // Service Unavailable
92 ProtocolError::Timeout => 504, // Gateway Timeout
93 ProtocolError::TargetNotFound(_) => 404, // Not Found
94 ProtocolError::TargetUnavailable(_) => 503, // Service Unavailable
95 ProtocolError::InvalidStateTransition(_) => 500, // Internal Server Error
96 }
97}
98
99impl<W: Workload> ActrNode<W> {
100 /// Get Inproc Transport Manager
101 ///
102 /// # Returns
103 /// - `Some(Arc<InprocTransportManager>)`: Initialized manager
104 /// - `None`: Not yet started (need to call start() first)
105 ///
106 /// # Use Cases
107 /// - Workload internals need to communicate with Shell
108 /// - Create custom LatencyFirst/MediaTrack channels
109 pub fn inproc_mgr(&self) -> Option<Arc<InprocTransportManager>> {
110 self.inproc_mgr.clone()
111 }
112
113 /// Get ActorId (if registration has completed)
114 pub fn actor_id(&self) -> Option<&ActrId> {
115 self.actor_id.as_ref()
116 }
117
118 /// Get credential (if registration has completed)
119 pub fn credential(&self) -> Option<&AIdCredential> {
120 self.credential.as_ref()
121 }
122
123 /// Get signaling client (for manual control such as UnregisterRequest)
124 pub fn signaling_client(&self) -> Arc<dyn SignalingClient> {
125 self.signaling_client.clone()
126 }
127
128 /// Get shutdown token for this node
129 pub fn shutdown_token(&self) -> CancellationToken {
130 self.shutdown_token.clone()
131 }
132
133 /// Discover remote actors of the specified type via signaling server.
134 ///
135 /// This requests best route candidates via `RouteCandidatesRequest` using the node's existing registration.
136 ///
137 /// The method returns the ordered list of candidate `ActrId`s reported by the signaling server.
138 ///
139 /// # Errors
140 /// - Returns `InvalidStateTransition` if the node is not started (no actor_id/credential).
141 /// The node must be started via `start()` before calling this method.
142 /// - Returns `TransportError` if the signaling client is not connected.
143 pub async fn discover_route_candidates(
144 &self,
145 target_type: &ActrType,
146 candidate_count: u32,
147 ) -> ActorResult<Vec<ActrId>> {
148 // Check if node is started (has actor_id and credential)
149 let actor_id = self.actor_id.as_ref().ok_or_else(|| {
150 actr_protocol::ProtocolError::InvalidStateTransition(
151 "Node is not started. Call start() first.".to_string(),
152 )
153 })?;
154
155 let credential = self.credential.as_ref().ok_or_else(|| {
156 actr_protocol::ProtocolError::InvalidStateTransition(
157 "Node is not started. Call start() first.".to_string(),
158 )
159 })?;
160
161 // Check if the signaling client is connected
162 if !self.signaling_client.is_connected() {
163 return Err(actr_protocol::ProtocolError::TransportError(
164 "Signaling client is not connected.".to_string(),
165 ));
166 }
167
168 let client = self.signaling_client.as_ref();
169
170 let criteria = route_candidates_request::NodeSelectionCriteria {
171 candidate_count,
172 ranking_factors: Vec::new(),
173 minimal_dependency_requirement: None,
174 minimal_health_requirement: None,
175 };
176
177 let route_request = RouteCandidatesRequest {
178 target_type: target_type.clone(),
179 criteria: Some(criteria),
180 client_location: None,
181 };
182
183 let route_response = client
184 .send_route_candidates_request(actor_id.clone(), credential.clone(), route_request)
185 .await
186 .map_err(|e| {
187 actr_protocol::ProtocolError::TransportError(format!(
188 "Route candidates request failed: {e}"
189 ))
190 })?;
191
192 match route_response.result {
193 Some(actr_protocol::route_candidates_response::Result::Success(success)) => {
194 Ok(success.candidates)
195 }
196 Some(actr_protocol::route_candidates_response::Result::Error(err)) => {
197 Err(actr_protocol::ProtocolError::TransportError(format!(
198 "Route candidates error {}: {}",
199 err.code, err.message
200 )))
201 }
202 None => Err(actr_protocol::ProtocolError::TransportError(
203 "Invalid route candidates response: missing result".to_string(),
204 )),
205 }
206 }
207
208 /// Handle incoming message envelope
209 ///
210 /// # Performance Analysis
211 /// 1. create_context: ~10ns
212 /// 2. W::Dispatcher::dispatch: ~5-10ns (static match, can be inlined)
213 /// 3. User business logic: variable
214 ///
215 /// Framework overhead: ~15-20ns (compared to 50-100ns in traditional approaches)
216 ///
217 /// # Zero-cost Abstraction
218 /// - Compiler can inline entire call chain
219 /// - Match branches can be directly expanded
220 /// - Final generated code approaches hand-written match expression
221 ///
222 /// # Parameters
223 /// - `envelope`: The RPC envelope containing the message
224 /// - `caller_id`: The ActrId of the caller (from transport layer, None for local Shell calls)
225 ///
226 /// # caller_id Design
227 ///
228 /// **Why not in RpcEnvelope?**
229 /// - Transport layer (WebRTC/Mailbox) already knows the sender
230 /// - All connections are direct P2P (no intermediaries)
231 /// - Storing in envelope would be redundant duplication
232 ///
233 /// **How it works:**
234 /// - WebRTC/Mailbox stores sender in `MessageRecord.from` (Protobuf bytes)
235 /// - Only decoded when creating Context (once per message)
236 /// - Shell calls pass `None` (local process, no remote caller)
237 /// - Remote calls decode from `MessageRecord.from`
238 ///
239 /// **trace_id vs request_id:**
240 /// - `trace_id`: Distributed tracing across entire call chain (A → B → C)
241 /// - `request_id`: Unique identifier for each request-response pair
242 /// - Both kept for flexibility in complex scenarios
243 /// - Single-hop calls: effectively identical
244 /// - Multi-hop calls: trace_id spans all hops, request_id per hop
245 pub async fn handle_incoming(
246 &self,
247 envelope: RpcEnvelope,
248 caller_id: Option<&ActrId>,
249 ) -> ActorResult<Bytes> {
250 use actr_framework::MessageDispatcher;
251
252 // Log received message
253 if let Some(caller) = caller_id {
254 tracing::debug!(
255 "📨 Handling incoming message: route_key={}, caller={}, request_id={}",
256 envelope.route_key,
257 caller.to_string_repr(),
258 envelope.request_id
259 );
260 } else {
261 tracing::debug!(
262 "📨 Handling incoming message: route_key={}, request_id={}",
263 envelope.route_key,
264 envelope.request_id
265 );
266 }
267
268 // 1. Create Context with caller_id from transport layer
269 let actor_id = self.actor_id.as_ref().ok_or_else(|| {
270 actr_protocol::ProtocolError::InvalidStateTransition(
271 "Actor ID not set - node must be started before handling messages".to_string(),
272 )
273 })?;
274 let credential = self.credential.as_ref().ok_or_else(|| {
275 actr_protocol::ProtocolError::InvalidStateTransition(
276 "Credential not set - node must be started before handling messages".to_string(),
277 )
278 })?;
279
280 let ctx = self
281 .context_factory
282 .as_ref()
283 .expect("ContextFactory must be initialized in start()")
284 .create(
285 actor_id,
286 caller_id, // caller_id from transport layer (MessageRecord.from)
287 &envelope.request_id,
288 credential,
289 );
290
291 // 2. Static MessageRouter dispatch (zero-cost abstraction)
292 // Compiler will inline entire call chain, generating code close to hand-written match
293 //
294 // Wrap dispatch in panic catching to prevent handler panics from crashing the runtime
295 let result = std::panic::AssertUnwindSafe(W::Dispatcher::dispatch(
296 &self.workload,
297 envelope.clone(),
298 &ctx,
299 ))
300 .catch_unwind()
301 .await;
302
303 let result = match result {
304 Ok(handler_result) => handler_result,
305 Err(panic_payload) => {
306 // Handler panicked - extract panic info
307 let panic_info = if let Some(s) = panic_payload.downcast_ref::<&str>() {
308 s.to_string()
309 } else if let Some(s) = panic_payload.downcast_ref::<String>() {
310 s.clone()
311 } else {
312 "Unknown panic payload".to_string()
313 };
314
315 tracing::error!(
316 severity = 8,
317 error_category = "handler_panic",
318 route_key = envelope.route_key,
319 request_id = %envelope.request_id,
320 "❌ Handler panicked: {}",
321 panic_info
322 );
323
324 // Return DecodeFailure error with panic info
325 // (using DecodeFailure as a proxy for "cannot process message")
326 Err(actr_protocol::ProtocolError::Actr(
327 actr_protocol::ActrError::DecodeFailure {
328 message: format!("Handler panicked: {panic_info}"),
329 },
330 ))
331 }
332 };
333
334 // 3. Log result
335 match &result {
336 Ok(_) => tracing::debug!(
337 request_id = %envelope.request_id,
338 route_key = %envelope.route_key,
339 "✅ Message handled successfully"
340 ),
341 Err(e) => tracing::error!(
342 severity = 6,
343 error_category = "handler_error",
344 request_id = %envelope.request_id,
345 route_key = %envelope.route_key,
346 "❌ Message handling failed: {:?}", e
347 ),
348 }
349
350 result
351 }
352
353 /// Start the system
354 ///
355 /// # Startup Sequence
356 /// 1. Connect to signaling server and register Actor
357 /// 2. Initialize transport layer (WebRTC)
358 /// 3. Call lifecycle hook on_start (if Lifecycle trait is implemented)
359 /// 4. Start Mailbox processing loop (State Path serial processing)
360 /// 5. Start Transport (begin receiving messages)
361 /// 6. Create ActrRef for Shell to interact with Workload
362 ///
363 /// # Returns
364 /// - `ActrRef<W>`: Lightweight reference for Shell to call Workload methods
365 pub async fn start(mut self) -> ActorResult<crate::actr_ref::ActrRef<W>> {
366 tracing::info!("🚀 Starting ActrNode");
367
368 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
369 // 1. Connect to signaling server and register
370 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
371 tracing::info!("📡 Connecting to signaling server");
372 self.signaling_client.connect().await.map_err(|e| {
373 actr_protocol::ProtocolError::TransportError(format!("Signaling connect failed: {e}"))
374 })?;
375 tracing::info!("✅ Connected to signaling server");
376
377 // Get ActrType
378 let actr_type = self.workload.actor_type();
379 tracing::info!("📋 Actor type: {}", actr_type.to_string_repr());
380
381 // Calculate ServiceSpec from config exports
382 let service_spec = self.config.calculate_service_spec();
383 if let Some(ref spec) = service_spec {
384 tracing::info!("📦 Service fingerprint: {}", spec.fingerprint);
385 tracing::info!("📦 Service tags: {:?}", spec.tags);
386 } else {
387 tracing::info!("📦 No proto exports, ServiceSpec is None");
388 }
389
390 // Construct protobuf RegisterRequest
391 let register_request = RegisterRequest {
392 actr_type: actr_type.clone(),
393 realm: self.config.realm,
394 service_spec,
395 acl: self.config.acl.clone(),
396 };
397
398 tracing::info!("📤 Registering actor with signaling server (protobuf)");
399
400 // Use send_register_request to send and wait for response
401 let register_response = self
402 .signaling_client
403 .send_register_request(register_request)
404 .await
405 .map_err(|e| {
406 actr_protocol::ProtocolError::TransportError(format!(
407 "Actor registration failed: {e}"
408 ))
409 })?;
410
411 // Handle RegisterResponse oneof result
412 //
413 // Collect background task handles (including unregister task) so they can be managed
414 // by ActrRefShared later.
415 let mut task_handles = Vec::new();
416
417 match register_response.result {
418 Some(register_response::Result::Success(register_ok)) => {
419 let actor_id = register_ok.actr_id;
420 let credential = register_ok.credential;
421
422 tracing::info!("✅ Registration successful");
423 tracing::info!(
424 "🆔 Assigned ActrId: {}",
425 actr_protocol::ActrIdExt::to_string_repr(&actor_id)
426 );
427 tracing::info!(
428 "🔐 Received credential (token_key_id: {})",
429 credential.token_key_id
430 );
431 tracing::info!(
432 "💓 Signaling heartbeat interval: {} seconds",
433 register_ok.signaling_heartbeat_interval_secs
434 );
435
436 // Log additional information (if available)
437 if register_ok.psk.is_some() {
438 tracing::debug!("🔑 Received PSK (bootstrap keying material)");
439 }
440 if let Some(expires_at) = ®ister_ok.credential_expires_at {
441 tracing::debug!("⏰ Credential expires at: {}s", expires_at.seconds);
442 }
443
444 // Store ActrId and Credential
445 self.actor_id = Some(actor_id.clone());
446 self.credential = Some(credential.clone());
447
448 // Persist identity into ContextFactory for later Context creation
449 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
450 // 1.3. Store references to both inproc managers (already created in ActrSystem::new())
451 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
452 let shell_to_workload = self
453 .context_factory
454 .as_ref()
455 .expect("ContextFactory must exist")
456 .shell_to_workload();
457 let workload_to_shell = self
458 .context_factory
459 .as_ref()
460 .expect("ContextFactory must exist")
461 .workload_to_shell();
462 self.inproc_mgr = Some(shell_to_workload); // Workload receives from this
463 self.workload_to_shell_mgr = Some(workload_to_shell); // Workload sends to this
464
465 tracing::info!(
466 "✅ Inproc infrastructure already ready (created in ActrSystem::new())"
467 );
468
469 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
470 // 1.5. Create WebRTC infrastructure
471 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
472 tracing::info!("🌐 Initializing WebRTC infrastructure");
473
474 // Get MediaFrameRegistry from ContextFactory
475 let media_frame_registry = self
476 .context_factory
477 .as_ref()
478 .expect("ContextFactory must exist")
479 .media_frame_registry
480 .clone();
481
482 // Create WebRtcCoordinator
483 let coordinator =
484 Arc::new(crate::wire::webrtc::coordinator::WebRtcCoordinator::new(
485 actor_id.clone(),
486 credential.clone(),
487 self.signaling_client.clone(),
488 WebRtcConfig::default(),
489 media_frame_registry,
490 ));
491
492 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
493 // 1.6. Create OutprocTransportManager + OutprocOutGate (新架构)
494 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
495 tracing::info!("🏗️ Creating OutprocTransportManager with WebRTC support");
496
497 // Create DefaultWireBuilder with WebRTC coordinator
498 use crate::transport::{DefaultWireBuilder, DefaultWireBuilderConfig};
499 let wire_builder_config = DefaultWireBuilderConfig {
500 websocket_url_template: None, // WebSocket disabled for now
501 enable_webrtc: true,
502 enable_websocket: false,
503 };
504 let wire_builder = Arc::new(DefaultWireBuilder::new(
505 Some(coordinator.clone()),
506 wire_builder_config,
507 ));
508
509 // Create OutprocTransportManager
510 use crate::transport::OutprocTransportManager;
511 let transport_manager =
512 Arc::new(OutprocTransportManager::new(actor_id.clone(), wire_builder));
513
514 // Create OutprocOutGate with WebRTC coordinator for MediaTrack support
515 use crate::outbound::{OutGate, OutprocOutGate};
516 let outproc_gate = Arc::new(OutprocOutGate::new(
517 transport_manager,
518 Some(coordinator.clone()), // Enable MediaTrack support
519 ));
520 let outproc_gate_enum = OutGate::OutprocOut(outproc_gate.clone());
521
522 tracing::info!("✅ OutprocTransportManager + OutprocOutGate initialized");
523
524 // Get DataStreamRegistry from ContextFactory
525 let data_stream_registry = self
526 .context_factory
527 .as_ref()
528 .expect("ContextFactory must exist")
529 .data_stream_registry
530 .clone();
531
532 // Create WebRtcGate with shared pending_requests and DataStreamRegistry
533 let pending_requests = outproc_gate.get_pending_requests();
534 let gate = Arc::new(crate::wire::webrtc::gate::WebRtcGate::new(
535 coordinator.clone(),
536 pending_requests,
537 data_stream_registry,
538 ));
539
540 // Set local_id
541 gate.set_local_id(actor_id.clone()).await;
542
543 tracing::info!(
544 "✅ WebRtcGate created with shared pending_requests and DataStreamRegistry"
545 );
546
547 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
548 // 1.7. Set outproc_gate in ContextFactory (completing initialization)
549 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
550 tracing::info!("🔧 Setting outproc_gate in ContextFactory");
551 self.context_factory
552 .as_mut()
553 .expect("ContextFactory must exist")
554 .set_outproc_gate(outproc_gate_enum);
555
556 tracing::info!(
557 "✅ ContextFactory fully initialized (inproc + outproc gates ready)"
558 );
559
560 // Save references (WebRtcGate kept for backward compatibility if needed)
561 self.webrtc_coordinator = Some(coordinator.clone());
562 self.webrtc_gate = Some(gate.clone());
563
564 tracing::info!("✅ WebRTC infrastructure initialized");
565
566 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
567 // 1.8. Spawn dedicated Unregister task (best-effort, with timeout)
568 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
569 //
570 // This task:
571 // - Waits for shutdown_token to be cancelled (e.g., wait_for_ctrl_c_and_shutdown)
572 // - Then sends UnregisterRequest via signaling client with a timeout
573 //
574 // NOTE: we push its JoinHandle into task_handles so it can be aborted
575 // by ActrRefShared::Drop if needed.
576 {
577 let shutdown = self.shutdown_token.clone();
578 let client = self.signaling_client.clone();
579 let actor_id_for_unreg = actor_id.clone();
580 let credential_for_unreg = credential.clone();
581 let webrtc_coordinator = self.webrtc_coordinator.clone();
582
583 let unregister_handle = tokio::spawn(async move {
584 // Wait for shutdown signal
585 shutdown.cancelled().await;
586 tracing::info!(
587 "📡 Shutdown signal received2, sending UnregisterRequest for Actor {:?}",
588 actor_id_for_unreg
589 );
590
591 // 1. 先关闭所有 WebRTC peer 连接(如果存在)
592 if let Some(coord) = webrtc_coordinator {
593 if let Err(e) = coord.close_all_peers().await {
594 tracing::warn!(
595 "⚠️ Failed to close all WebRTC peers before UnregisterRequest: {}",
596 e
597 );
598 } else {
599 tracing::info!(
600 "✅ All WebRTC peers closed before UnregisterRequest"
601 );
602 }
603 } else {
604 tracing::debug!(
605 "WebRTC coordinator not found before UnregisterRequest (no WebRTC?)"
606 );
607 }
608
609 // 2. 再发送 UnregisterRequest,设置一个超时(例如 5 秒)
610 let result = tokio::time::timeout(
611 std::time::Duration::from_secs(5),
612 client.send_unregister_request(
613 actor_id_for_unreg.clone(),
614 credential_for_unreg.clone(),
615 Some("Graceful shutdown".to_string()),
616 ),
617 )
618 .await;
619 tracing::info!("UnregisterRequest result: {:?}", result);
620 match result {
621 Ok(Ok(_)) => {
622 tracing::info!(
623 "✅ UnregisterRequest sent to signaling server for Actor {:?}",
624 actor_id_for_unreg
625 );
626 }
627 Ok(Err(e)) => {
628 tracing::warn!(
629 "⚠️ Failed to send UnregisterRequest for Actor {:?}: {}",
630 actor_id_for_unreg,
631 e
632 );
633 }
634 Err(_) => {
635 tracing::warn!(
636 "⚠️ UnregisterRequest timeout (5s) for Actor {:?}",
637 actor_id_for_unreg
638 );
639 }
640 }
641 });
642
643 task_handles.push(unregister_handle);
644 }
645
646 // Spawn signaling auto-reconnect helper that reacts immediately to disconnect events.
647 crate::wire::webrtc::spawn_signaling_reconnector(
648 self.signaling_client.clone(),
649 self.shutdown_token.clone(),
650 );
651 }
652 Some(register_response::Result::Error(error)) => {
653 tracing::error!(
654 severity = 10,
655 error_category = "registration_error",
656 error_code = error.code,
657 "❌ Registration failed: code={}, message={}",
658 error.code,
659 error.message
660 );
661 return Err(actr_protocol::ProtocolError::TransportError(format!(
662 "Registration rejected: {} (code: {})",
663 error.message, error.code
664 )));
665 }
666 None => {
667 tracing::error!(
668 severity = 10,
669 error_category = "registration_error",
670 "❌ Registration response missing result"
671 );
672 return Err(actr_protocol::ProtocolError::TransportError(
673 "Invalid registration response: missing result".to_string(),
674 ));
675 }
676 }
677
678 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
679 // 2. Transport layer initialization (completed via WebRTC infrastructure)
680 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
681 tracing::info!("✅ Transport layer initialized via WebRTC infrastructure");
682
683 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
684 // 3.1 Convert to Arc (before starting background loops)
685 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
686 // Clone actor_id before moving self into Arc
687 let actor_id = self
688 .actor_id
689 .as_ref()
690 .ok_or_else(|| {
691 actr_protocol::ProtocolError::InvalidStateTransition(
692 "Actor ID not set - registration must complete before starting node"
693 .to_string(),
694 )
695 })?
696 .clone();
697 let credential = self
698 .credential
699 .as_ref()
700 .ok_or_else(|| {
701 actr_protocol::ProtocolError::InvalidStateTransition(
702 "Credential not set - node must be started before handling messages"
703 .to_string(),
704 )
705 })?
706 .clone();
707
708 let actor_id_for_shell = actor_id.clone();
709 let shutdown_token = self.shutdown_token.clone();
710 let node_ref = Arc::new(self);
711
712 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
713 // 3.5. Start WebRTC background loops (BEFORE on_start)
714 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
715 // CRITICAL: Start signaling loop before on_start() to avoid deadlock
716 // where on_start() tries to send messages but signaling loop isn't running
717 tracing::info!("🚀 Starting WebRTC background loops");
718
719 // Start WebRtcCoordinator signaling loop
720 if let Some(coordinator) = &node_ref.webrtc_coordinator {
721 coordinator.clone().start().await.map_err(|e| {
722 actr_protocol::ProtocolError::TransportError(format!(
723 "WebRtcCoordinator start failed: {e}"
724 ))
725 })?;
726 tracing::info!("✅ WebRtcCoordinator signaling loop started");
727 }
728
729 // Start WebRtcGate message receive loop (route to Mailbox)
730 if let Some(gate) = &node_ref.webrtc_gate {
731 gate.start_receive_loop(node_ref.mailbox.clone())
732 .await
733 .map_err(|e| {
734 actr_protocol::ProtocolError::TransportError(format!(
735 "WebRtcGate receive loop start failed: {e}"
736 ))
737 })?;
738 tracing::info!("✅ WebRtcGate → Mailbox routing started");
739 }
740
741 tracing::info!("✅ WebRTC background loops started");
742
743 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
744 // 4. Call lifecycle hook on_start (AFTER WebRTC loops are running)
745 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
746 tracing::info!("🪝 Calling lifecycle hook: on_start");
747
748 let ctx = node_ref
749 .context_factory
750 .as_ref()
751 .expect("ContextFactory must be initialized before on_start")
752 .create(
753 &actor_id,
754 None, // caller_id
755 "bootstrap", // request_id
756 &credential,
757 );
758 node_ref.workload.on_start(&ctx).await?;
759 tracing::info!("✅ Lifecycle hook on_start completed");
760
761 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
762 // 4.6. Start Inproc receive loop (Shell → Workload)
763 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
764 tracing::info!("🔄 Starting Inproc receive loop (Shell → Workload)");
765 // Start Workload receive loop (Shell → Workload REQUEST)
766 if let Some(shell_to_workload) = &node_ref.inproc_mgr {
767 if let Some(workload_to_shell) = &node_ref.workload_to_shell_mgr {
768 let node = node_ref.clone();
769 let request_rx_lane = shell_to_workload
770 .get_lane(actr_protocol::PayloadType::RpcReliable, None)
771 .await
772 .map_err(|e| {
773 actr_protocol::ProtocolError::TransportError(format!(
774 "Failed to get Workload receive lane: {e}"
775 ))
776 })?;
777 let response_tx = workload_to_shell.clone();
778 let shutdown = shutdown_token.clone();
779
780 let inproc_handle = tokio::spawn(async move {
781 loop {
782 tokio::select! {
783 _ = shutdown.cancelled() => {
784 tracing::info!("📭 Workload receive loop (Shell → Workload) received shutdown signal");
785 break;
786 }
787
788 envelope_result = request_rx_lane.recv_envelope() => {
789 match envelope_result {
790 Ok(envelope) => {
791 let request_id = envelope.request_id.clone();
792 let span = tracing::info_span!("webrtc.receive_rpc", request_id = %request_id);
793 // Extract and set tracing context from envelope
794 #[cfg(feature = "opentelemetry")]
795 {
796 use crate::wire::webrtc::trace::set_parent_from_rpc_envelope;
797 set_parent_from_rpc_envelope(&span, &envelope);
798 }
799
800 tracing::debug!("📨 Workload received REQUEST from Shell: request_id={}", request_id);
801
802 // Shell calls have no caller_id (local process communication)
803 #[cfg(feature = "opentelemetry")]
804 let span_for_inject = span.clone();
805 match node.handle_incoming(envelope.clone(), None).instrument(span).await {
806 Ok(response_bytes) => {
807 // Send RESPONSE back via workload_to_shell
808 // Keep same route_key (no prefix needed - separate channels!)
809 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
810 let mut response_envelope = RpcEnvelope {
811 route_key: envelope.route_key.clone(),
812 payload: Some(response_bytes),
813 error: None,
814 traceparent: None,
815 tracestate: None,
816 request_id: request_id.clone(),
817 metadata: Vec::new(),
818 timeout_ms: 30000,
819 };
820 // Inject tracing context
821 #[cfg(feature = "opentelemetry")]
822 {
823 use crate::wire::webrtc::trace::inject_span_context_to_rpc;
824 inject_span_context_to_rpc(&span_for_inject, &mut response_envelope);
825 }
826
827 // Send via Workload → Shell channel
828 if let Err(e) = response_tx.send_message(PayloadType::RpcReliable, None, response_envelope).await {
829 tracing::error!(
830 severity = 7,
831 error_category = "transport_error",
832 request_id = %request_id,
833 "❌ Failed to send RESPONSE to Shell: {:?}", e
834 );
835 }
836 }
837 Err(e) => {
838 tracing::error!(
839 severity = 6,
840 error_category = "handler_error",
841 request_id = %request_id,
842 route_key = %envelope.route_key,
843 "❌ Workload message handling failed: {:?}", e
844 );
845
846 // Send error response (system-level error on envelope)
847 let error_response = actr_protocol::ErrorResponse {
848 code: protocol_error_to_code(&e),
849 message: e.to_string(),
850 };
851
852 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
853 let mut error_envelope = RpcEnvelope {
854 route_key: envelope.route_key.clone(),
855 payload: None,
856 error: Some(error_response),
857 traceparent: envelope.traceparent.clone(),
858 tracestate: envelope.tracestate.clone(),
859 request_id: request_id.clone(),
860 metadata: Vec::new(),
861 timeout_ms: 30000,
862 };
863 // Inject tracing context
864 #[cfg(feature = "opentelemetry")]
865 {
866 use crate::wire::webrtc::trace::inject_span_context_to_rpc;
867 inject_span_context_to_rpc(&tracing::Span::current(), &mut error_envelope);
868 }
869
870 if let Err(e) = response_tx.send_message(PayloadType::RpcReliable, None, error_envelope).await {
871 tracing::error!(
872 severity = 7,
873 error_category = "transport_error",
874 request_id = %request_id,
875 "❌ Failed to send ERROR response to Shell: {:?}", e
876 );
877 }
878 }
879 }
880 }
881 Err(e) => {
882 tracing::error!(
883 severity = 8,
884 error_category = "transport_error",
885 "❌ Failed to receive from Shell → Workload lane: {:?}", e
886 );
887 break;
888 }
889 }
890 }
891 }
892 }
893 tracing::info!(
894 "✅ Workload receive loop (Shell → Workload) terminated gracefully"
895 );
896 });
897
898 task_handles.push(inproc_handle);
899 }
900 }
901 tracing::info!("✅ Workload receive loop (Shell → Workload REQUEST) started");
902
903 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
904 // 4.7. Start Shell receive loop (Workload → Shell RESPONSE)
905 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
906 tracing::info!("🔄 Starting Shell receive loop (Workload → Shell RESPONSE)");
907 if let Some(workload_to_shell) = &node_ref.workload_to_shell_mgr {
908 if let Some(shell_to_workload) = &node_ref.inproc_mgr {
909 let response_rx_lane = workload_to_shell
910 .get_lane(actr_protocol::PayloadType::RpcReliable, None)
911 .await
912 .map_err(|e| {
913 actr_protocol::ProtocolError::TransportError(format!(
914 "Failed to get Shell receive lane: {e}"
915 ))
916 })?;
917 let request_mgr = shell_to_workload.clone();
918 let shutdown = shutdown_token.clone();
919
920 let shell_receive_handle = tokio::spawn(async move {
921 loop {
922 tokio::select! {
923 _ = shutdown.cancelled() => {
924 tracing::info!("📭 Shell receive loop (Workload → Shell) received shutdown signal");
925 break;
926 }
927
928 envelope_result = response_rx_lane.recv_envelope() => {
929 match envelope_result {
930 Ok(envelope) => {
931 tracing::debug!("📨 Shell received RESPONSE from Workload: request_id={}", envelope.request_id);
932
933 // Check if response is success or error
934 match (envelope.payload, envelope.error) {
935 (Some(payload), None) => {
936 // Success response
937 if let Err(e) = request_mgr.complete_response(&envelope.request_id, payload).await {
938 tracing::warn!(
939 severity = 4,
940 error_category = "orphan_response",
941 request_id = %envelope.request_id,
942 "⚠️ No pending request found for response: {:?}", e
943 );
944 }
945 }
946 (None, Some(error)) => {
947 // Error response - convert to ProtocolError and complete with error
948 let protocol_err = actr_protocol::ProtocolError::TransportError(
949 format!("RPC error {}: {}", error.code, error.message)
950 );
951 if let Err(e) = request_mgr.complete_error(&envelope.request_id, protocol_err).await {
952 tracing::warn!(
953 severity = 4,
954 error_category = "orphan_response",
955 request_id = %envelope.request_id,
956 "⚠️ No pending request found for error response: {:?}", e
957 );
958 }
959 }
960 _ => {
961 tracing::error!(
962 severity = 7,
963 error_category = "protocol_error",
964 request_id = %envelope.request_id,
965 "❌ Invalid RpcEnvelope: both payload and error are present or both absent"
966 );
967 }
968 }
969 }
970 Err(e) => {
971 tracing::error!(
972 severity = 8,
973 error_category = "transport_error",
974 "❌ Failed to receive from Workload → Shell lane: {:?}", e
975 );
976 break;
977 }
978 }
979 }
980 }
981 }
982 tracing::info!(
983 "✅ Shell receive loop (Workload → Shell) terminated gracefully"
984 );
985 });
986
987 task_handles.push(shell_receive_handle);
988 }
989 }
990 tracing::info!("✅ Shell receive loop (Workload → Shell RESPONSE) started");
991
992 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
993 // 5. Start Mailbox processing loop (State Path)
994 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
995 tracing::info!("🔄 Starting Mailbox processing loop (State Path)");
996 {
997 let node = node_ref.clone();
998 let mailbox = node_ref.mailbox.clone();
999 let gate = node_ref.webrtc_gate.clone();
1000 let shutdown = shutdown_token.clone();
1001
1002 let mailbox_handle = tokio::spawn(async move {
1003 loop {
1004 tokio::select! {
1005 // Listen for shutdown signal
1006 _ = shutdown.cancelled() => {
1007 tracing::info!("📭 Mailbox loop received shutdown signal");
1008 break;
1009 }
1010
1011 // Dequeue messages (by priority)
1012 result = mailbox.dequeue() => {
1013 match result {
1014 Ok(messages) => {
1015 if messages.is_empty() {
1016 // Queue empty, sleep briefly
1017 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1018 continue;
1019 }
1020
1021 tracing::debug!("📬 Mailbox dequeue: {} messages", messages.len());
1022
1023 // Process messages one by one
1024 for msg_record in messages {
1025 // Deserialize RpcEnvelope (Protobuf)
1026 match RpcEnvelope::decode(&msg_record.payload[..]) {
1027 Ok(envelope) => {
1028 let request_id = envelope.request_id.clone();
1029 tracing::debug!("📦 Processing message: request_id={}", request_id);
1030
1031 // Decode caller_id from MessageRecord.from (transport layer)
1032 let caller_id_result = ActrId::decode(&msg_record.from[..]);
1033 let caller_id_ref = caller_id_result.as_ref().ok();
1034
1035 if caller_id_ref.is_none() {
1036 tracing::warn!(
1037 request_id = %request_id,
1038 "⚠️ Failed to decode caller_id from MessageRecord.from"
1039 );
1040 }
1041
1042 // Call handle_incoming with caller_id from transport layer
1043 match node.handle_incoming(envelope.clone(), caller_id_ref).await {
1044 Ok(response_bytes) => {
1045 // Send response (reuse request_id)
1046 if let Some(ref gate) = gate {
1047 // Use already decoded caller_id
1048 match caller_id_result {
1049 Ok(caller) => {
1050 // Construct response RpcEnvelope (reuse request_id!)
1051 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1052 let mut response_envelope = RpcEnvelope {
1053 request_id, // Reuse!
1054 route_key: envelope.route_key.clone(),
1055 payload: Some(response_bytes),
1056 error: None,
1057 traceparent: envelope.traceparent.clone(),
1058 tracestate: envelope.tracestate.clone(),
1059 metadata: Vec::new(), // Response doesn't need extra metadata
1060 timeout_ms: 30000,
1061 };
1062 // Inject tracing context
1063 #[cfg(feature = "opentelemetry")]
1064 {
1065 use crate::wire::webrtc::trace::inject_span_context_to_rpc;
1066 inject_span_context_to_rpc(&tracing::Span::current(), &mut response_envelope);
1067 }
1068
1069 if let Err(e) = gate.send_response(&caller, response_envelope).await {
1070 tracing::error!(
1071 severity = 7,
1072 error_category = "transport_error",
1073 request_id = %envelope.request_id,
1074 "❌ Failed to send response: {:?}", e
1075 );
1076 }
1077 }
1078 Err(e) => {
1079 tracing::error!(
1080 severity = 8,
1081 error_category = "protobuf_decode",
1082 request_id = %envelope.request_id,
1083 "❌ Failed to decode caller_id: {:?}", e
1084 );
1085 }
1086 }
1087 }
1088
1089 // ACK message
1090 if let Err(e) = mailbox.ack(msg_record.id).await {
1091 tracing::error!(
1092 severity = 9,
1093 error_category = "mailbox_error",
1094 request_id = %envelope.request_id,
1095 message_id = %msg_record.id,
1096 "❌ Mailbox ACK failed: {:?}", e
1097 );
1098 }
1099 }
1100 Err(e) => {
1101 tracing::error!(
1102 severity = 6,
1103 error_category = "handler_error",
1104 request_id = %envelope.request_id,
1105 route_key = %envelope.route_key,
1106 "❌ handle_incoming failed: {:?}", e
1107 );
1108 // ACK to avoid infinite retries
1109 // Application errors are caller's responsibility
1110 let _ = mailbox.ack(msg_record.id).await;
1111 }
1112 }
1113 }
1114 Err(e) => {
1115 // Poison message - cannot decode RpcEnvelope
1116 tracing::error!(
1117 severity = 9,
1118 error_category = "protobuf_decode",
1119 message_id = %msg_record.id,
1120 "❌ Poison message: Failed to deserialize RpcEnvelope: {:?}", e
1121 );
1122
1123 // Write to Dead Letter Queue
1124 use actr_mailbox::DlqRecord;
1125 use chrono::Utc;
1126 use uuid::Uuid;
1127
1128 let dlq_record = DlqRecord {
1129 id: Uuid::new_v4(),
1130 original_message_id: Some(msg_record.id.to_string()),
1131 from: Some(msg_record.from.clone()),
1132 to: node.actor_id.as_ref().map(|id| {
1133 let mut buf = Vec::new();
1134 id.encode(&mut buf).unwrap();
1135 buf
1136 }),
1137 raw_bytes: msg_record.payload.clone(),
1138 error_message: format!("Protobuf decode failed: {e}"),
1139 error_category: "protobuf_decode".to_string(),
1140 trace_id: format!("mailbox-{}", msg_record.id), // Fallback trace_id
1141 request_id: None,
1142 created_at: Utc::now(),
1143 redrive_attempts: 0,
1144 last_redrive_at: None,
1145 context: Some(format!(
1146 r#"{{"source":"mailbox","priority":"{}"}}"#,
1147 match msg_record.priority {
1148 actr_mailbox::MessagePriority::High => "high",
1149 actr_mailbox::MessagePriority::Normal => "normal",
1150 }
1151 )),
1152 };
1153
1154 if let Err(dlq_err) = node.dlq.enqueue(dlq_record).await {
1155 tracing::error!(
1156 severity = 10,
1157 "❌ CRITICAL: Failed to write poison message to DLQ: {:?}", dlq_err
1158 );
1159 } else {
1160 tracing::warn!(
1161 severity = 9,
1162 "☠️ Poison message moved to DLQ: message_id={}", msg_record.id
1163 );
1164 }
1165
1166 // ACK the poison message to remove from mailbox
1167 let _ = mailbox.ack(msg_record.id).await;
1168 }
1169 }
1170 }
1171 }
1172 Err(e) => {
1173 tracing::error!(
1174 severity = 9,
1175 error_category = "mailbox_error",
1176 "❌ Mailbox dequeue failed: {:?}", e
1177 );
1178 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1179 }
1180 }
1181 }
1182 }
1183 }
1184 tracing::info!("✅ Mailbox processing loop terminated gracefully");
1185 });
1186
1187 task_handles.push(mailbox_handle);
1188 }
1189 tracing::info!("✅ Mailbox processing loop started");
1190
1191 tracing::info!("✅ ActrNode started successfully");
1192
1193 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1194 // 6. Create ActrRef for Shell to interact with Workload
1195 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1196 use crate::actr_ref::{ActrRef, ActrRefShared};
1197 use crate::outbound::InprocOutGate;
1198
1199 // Create InprocOutGate from shell_to_workload transport manager
1200 let shell_to_workload = node_ref
1201 .inproc_mgr
1202 .clone()
1203 .expect("inproc_mgr must be initialized");
1204 let inproc_gate = Arc::new(InprocOutGate::new(shell_to_workload));
1205
1206 // Create ActrRefShared
1207 let actr_ref_shared = Arc::new(ActrRefShared {
1208 actor_id: actor_id_for_shell.clone(),
1209 inproc_gate,
1210 shutdown_token: shutdown_token.clone(),
1211 task_handles: tokio::sync::Mutex::new(task_handles),
1212 });
1213
1214 // Create ActrRef
1215 let actr_ref = ActrRef::new(actr_ref_shared, node_ref);
1216
1217 tracing::info!("✅ ActrRef created (Shell → Workload communication handle)");
1218
1219 Ok(actr_ref)
1220 }
1221}