actr_runtime/lifecycle/actr_node.rs
1//! ActrNode - ActrSystem + Workload (1:1 composition)
2
3use actr_framework::{Bytes, Workload};
4use actr_protocol::prost::Message as ProstMessage;
5use actr_protocol::{
6 ActorResult, ActrId, ActrType, PayloadType, RouteCandidatesRequest, RpcEnvelope,
7 route_candidates_request,
8};
9use futures_util::FutureExt;
10use std::sync::Arc;
11use tokio_util::sync::CancellationToken;
12
13use crate::context_factory::ContextFactory;
14use crate::transport::InprocTransportManager;
15
16// Use types from sub-crates
17use crate::wire::webrtc::{SignalingClient, WebRtcConfig};
18use actr_mailbox::{DeadLetterQueue, Mailbox};
19use actr_protocol::{AIdCredential, RegisterRequest, register_response};
20
21// Use extension traits from actr-protocol
22use actr_protocol::{ActrIdExt, ActrTypeExt};
23
24/// ActrNode - ActrSystem + Workload (1:1 composition)
25///
26/// # Generic Parameters
27/// - `W`: Workload type
28///
29/// # MessageDispatcher Association
30/// - Statically associated via W::Dispatcher
31/// - Does not store Dispatcher instance (not even ZST needed)
32/// - Dispatch calls entirely through type system
33pub struct ActrNode<W: Workload> {
34 /// Runtime configuration
35 pub(crate) config: actr_config::Config,
36
37 /// Workload instance (the only business logic)
38 pub(crate) workload: Arc<W>,
39
40 /// SQLite persistent mailbox
41 pub(crate) mailbox: Arc<dyn Mailbox>,
42
43 /// Dead Letter Queue for poison messages
44 pub(crate) dlq: Arc<dyn DeadLetterQueue>,
45
46 /// Context factory (created in start() after obtaining ActorId)
47 pub(crate) context_factory: Option<ContextFactory>,
48
49 /// Signaling client
50 pub(crate) signaling_client: Arc<dyn SignalingClient>,
51
52 /// Actor ID (obtained after startup)
53 pub(crate) actor_id: Option<ActrId>,
54
55 /// Actor Credential (obtained after startup, used for subsequent authentication messages)
56 pub(crate) credential: Option<AIdCredential>,
57
58 /// WebRTC coordinator (created after startup)
59 pub(crate) webrtc_coordinator: Option<Arc<crate::wire::webrtc::coordinator::WebRtcCoordinator>>,
60
61 /// WebRTC Gate (created after startup)
62 pub(crate) webrtc_gate: Option<Arc<crate::wire::webrtc::gate::WebRtcGate>>,
63
64 /// Shell → Workload Transport Manager
65 ///
66 /// Workload receives REQUEST from Shell (zero serialization, direct RpcEnvelope passing)
67 pub(crate) inproc_mgr: Option<Arc<InprocTransportManager>>,
68
69 /// Workload → Shell Transport Manager
70 ///
71 /// Workload sends RESPONSE to Shell (separate pending_requests from Shell's)
72 pub(crate) workload_to_shell_mgr: Option<Arc<InprocTransportManager>>,
73
74 /// Shutdown token for graceful shutdown
75 pub(crate) shutdown_token: CancellationToken,
76}
77
78/// Map ProtocolError to error code for ErrorResponse
79fn protocol_error_to_code(err: &actr_protocol::ProtocolError) -> u32 {
80 use actr_protocol::ProtocolError;
81 match err {
82 ProtocolError::Actr(_) => 400, // Bad Request - identity/decode error
83 ProtocolError::Uri(_) => 400, // Bad Request - URI parsing error
84 ProtocolError::Name(_) => 400, // Bad Request - invalid name
85 ProtocolError::SerializationError(_) => 500, // Internal Server Error
86 ProtocolError::DeserializationError(_) => 400, // Bad Request - invalid payload
87 ProtocolError::DecodeError(_) => 400, // Bad Request - decode failure
88 ProtocolError::EncodeError(_) => 500, // Internal Server Error
89 ProtocolError::UnknownRoute(_) => 404, // Not Found - route not found
90 ProtocolError::TransportError(_) => 503, // Service Unavailable
91 ProtocolError::Timeout => 504, // Gateway Timeout
92 ProtocolError::TargetNotFound(_) => 404, // Not Found
93 ProtocolError::TargetUnavailable(_) => 503, // Service Unavailable
94 ProtocolError::InvalidStateTransition(_) => 500, // Internal Server Error
95 }
96}
97
98impl<W: Workload> ActrNode<W> {
99 /// Get Inproc Transport Manager
100 ///
101 /// # Returns
102 /// - `Some(Arc<InprocTransportManager>)`: Initialized manager
103 /// - `None`: Not yet started (need to call start() first)
104 ///
105 /// # Use Cases
106 /// - Workload internals need to communicate with Shell
107 /// - Create custom LatencyFirst/MediaTrack channels
108 pub fn inproc_mgr(&self) -> Option<Arc<InprocTransportManager>> {
109 self.inproc_mgr.clone()
110 }
111
112 /// Get ActorId (if registration has completed)
113 pub fn actor_id(&self) -> Option<&ActrId> {
114 self.actor_id.as_ref()
115 }
116
117 /// Get credential (if registration has completed)
118 pub fn credential(&self) -> Option<&AIdCredential> {
119 self.credential.as_ref()
120 }
121
122 /// Get signaling client (for manual control such as UnregisterRequest)
123 pub fn signaling_client(&self) -> Arc<dyn SignalingClient> {
124 self.signaling_client.clone()
125 }
126
127 /// Get shutdown token for this node
128 pub fn shutdown_token(&self) -> CancellationToken {
129 self.shutdown_token.clone()
130 }
131
132 /// Discover remote actors of the specified type via signaling server.
133 ///
134 /// This requests best route candidates via `RouteCandidatesRequest` using the node's existing registration.
135 ///
136 /// The method returns the ordered list of candidate `ActrId`s reported by the signaling server.
137 ///
138 /// # Errors
139 /// - Returns `InvalidStateTransition` if the node is not started (no actor_id/credential).
140 /// The node must be started via `start()` before calling this method.
141 /// - Returns `TransportError` if the signaling client is not connected.
142 pub async fn discover_route_candidates(
143 &self,
144 target_type: &ActrType,
145 candidate_count: u32,
146 ) -> ActorResult<Vec<ActrId>> {
147 // Check if node is started (has actor_id and credential)
148 let actor_id = self.actor_id.as_ref().ok_or_else(|| {
149 actr_protocol::ProtocolError::InvalidStateTransition(
150 "Node is not started. Call start() first.".to_string(),
151 )
152 })?;
153
154 let credential = self.credential.as_ref().ok_or_else(|| {
155 actr_protocol::ProtocolError::InvalidStateTransition(
156 "Node is not started. Call start() first.".to_string(),
157 )
158 })?;
159
160 // Check if the signaling client is connected
161 if !self.signaling_client.is_connected() {
162 return Err(actr_protocol::ProtocolError::TransportError(
163 "Signaling client is not connected.".to_string(),
164 ));
165 }
166
167 let client = self.signaling_client.as_ref();
168
169 let criteria = route_candidates_request::NodeSelectionCriteria {
170 candidate_count,
171 ranking_factors: Vec::new(),
172 minimal_dependency_requirement: None,
173 minimal_health_requirement: None,
174 };
175
176 let route_request = RouteCandidatesRequest {
177 target_type: target_type.clone(),
178 criteria: Some(criteria),
179 client_location: None,
180 };
181
182 let route_response = client
183 .send_route_candidates_request(actor_id.clone(), credential.clone(), route_request)
184 .await
185 .map_err(|e| {
186 actr_protocol::ProtocolError::TransportError(format!(
187 "Route candidates request failed: {e}"
188 ))
189 })?;
190
191 match route_response.result {
192 Some(actr_protocol::route_candidates_response::Result::Success(success)) => {
193 Ok(success.candidates)
194 }
195 Some(actr_protocol::route_candidates_response::Result::Error(err)) => {
196 Err(actr_protocol::ProtocolError::TransportError(format!(
197 "Route candidates error {}: {}",
198 err.code, err.message
199 )))
200 }
201 None => Err(actr_protocol::ProtocolError::TransportError(
202 "Invalid route candidates response: missing result".to_string(),
203 )),
204 }
205 }
206
207 /// Handle incoming message envelope
208 ///
209 /// # Performance Analysis
210 /// 1. create_context: ~10ns
211 /// 2. W::Dispatcher::dispatch: ~5-10ns (static match, can be inlined)
212 /// 3. User business logic: variable
213 ///
214 /// Framework overhead: ~15-20ns (compared to 50-100ns in traditional approaches)
215 ///
216 /// # Zero-cost Abstraction
217 /// - Compiler can inline entire call chain
218 /// - Match branches can be directly expanded
219 /// - Final generated code approaches hand-written match expression
220 ///
221 /// # Parameters
222 /// - `envelope`: The RPC envelope containing the message
223 /// - `caller_id`: The ActrId of the caller (from transport layer, None for local Shell calls)
224 ///
225 /// # caller_id Design
226 ///
227 /// **Why not in RpcEnvelope?**
228 /// - Transport layer (WebRTC/Mailbox) already knows the sender
229 /// - All connections are direct P2P (no intermediaries)
230 /// - Storing in envelope would be redundant duplication
231 ///
232 /// **How it works:**
233 /// - WebRTC/Mailbox stores sender in `MessageRecord.from` (Protobuf bytes)
234 /// - Only decoded when creating Context (once per message)
235 /// - Shell calls pass `None` (local process, no remote caller)
236 /// - Remote calls decode from `MessageRecord.from`
237 ///
238 /// **trace_id vs request_id:**
239 /// - `trace_id`: Distributed tracing across entire call chain (A → B → C)
240 /// - `request_id`: Unique identifier for each request-response pair
241 /// - Both kept for flexibility in complex scenarios
242 /// - Single-hop calls: effectively identical
243 /// - Multi-hop calls: trace_id spans all hops, request_id per hop
244 pub async fn handle_incoming(
245 &self,
246 envelope: RpcEnvelope,
247 caller_id: Option<&ActrId>,
248 ) -> ActorResult<Bytes> {
249 use actr_framework::MessageDispatcher;
250
251 // Log received message
252 if let Some(caller) = caller_id {
253 tracing::debug!(
254 "📨 Handling incoming message: route_key={}, caller={}, trace_id={}, request_id={}",
255 envelope.route_key,
256 caller.to_string_repr(),
257 envelope.trace_id,
258 envelope.request_id
259 );
260 } else {
261 tracing::debug!(
262 "📨 Handling incoming message: route_key={}, trace_id={}, request_id={}",
263 envelope.route_key,
264 envelope.trace_id,
265 envelope.request_id
266 );
267 }
268
269 // 1. Create Context with caller_id from transport layer
270 let actor_id = self.actor_id.as_ref().ok_or_else(|| {
271 actr_protocol::ProtocolError::InvalidStateTransition(
272 "Actor ID not set - node must be started before handling messages".to_string(),
273 )
274 })?;
275 let credential = self.credential.as_ref().ok_or_else(|| {
276 actr_protocol::ProtocolError::InvalidStateTransition(
277 "Credential not set - node must be started before handling messages".to_string(),
278 )
279 })?;
280
281 let ctx = self
282 .context_factory
283 .as_ref()
284 .expect("ContextFactory must be initialized in start()")
285 .create(
286 actor_id,
287 caller_id, // caller_id from transport layer (MessageRecord.from)
288 &envelope.trace_id,
289 &envelope.request_id,
290 credential,
291 );
292
293 // 2. Static MessageRouter dispatch (zero-cost abstraction)
294 // Compiler will inline entire call chain, generating code close to hand-written match
295 //
296 // Wrap dispatch in panic catching to prevent handler panics from crashing the runtime
297 let result = std::panic::AssertUnwindSafe(W::Dispatcher::dispatch(
298 &self.workload,
299 envelope.clone(),
300 &ctx,
301 ))
302 .catch_unwind()
303 .await;
304
305 let result = match result {
306 Ok(handler_result) => handler_result,
307 Err(panic_payload) => {
308 // Handler panicked - extract panic info
309 let panic_info = if let Some(s) = panic_payload.downcast_ref::<&str>() {
310 s.to_string()
311 } else if let Some(s) = panic_payload.downcast_ref::<String>() {
312 s.clone()
313 } else {
314 "Unknown panic payload".to_string()
315 };
316
317 tracing::error!(
318 severity = 8,
319 error_category = "handler_panic",
320 route_key = envelope.route_key,
321 trace_id = envelope.trace_id,
322 "❌ Handler panicked: {}",
323 panic_info
324 );
325
326 // Return DecodeFailure error with panic info
327 // (using DecodeFailure as a proxy for "cannot process message")
328 Err(actr_protocol::ProtocolError::Actr(
329 actr_protocol::ActrError::DecodeFailure {
330 message: format!("Handler panicked: {panic_info}"),
331 },
332 ))
333 }
334 };
335
336 // 3. Log result
337 match &result {
338 Ok(_) => tracing::debug!(
339 trace_id = %envelope.trace_id,
340 request_id = %envelope.request_id,
341 route_key = %envelope.route_key,
342 "✅ Message handled successfully"
343 ),
344 Err(e) => tracing::error!(
345 severity = 6,
346 error_category = "handler_error",
347 trace_id = %envelope.trace_id,
348 request_id = %envelope.request_id,
349 route_key = %envelope.route_key,
350 "❌ Message handling failed: {:?}", e
351 ),
352 }
353
354 result
355 }
356
357 /// Start the system
358 ///
359 /// # Startup Sequence
360 /// 1. Connect to signaling server and register Actor
361 /// 2. Initialize transport layer (WebRTC)
362 /// 3. Call lifecycle hook on_start (if Lifecycle trait is implemented)
363 /// 4. Start Mailbox processing loop (State Path serial processing)
364 /// 5. Start Transport (begin receiving messages)
365 /// 6. Create ActrRef for Shell to interact with Workload
366 ///
367 /// # Returns
368 /// - `ActrRef<W>`: Lightweight reference for Shell to call Workload methods
369 pub async fn start(mut self) -> ActorResult<crate::actr_ref::ActrRef<W>> {
370 tracing::info!("🚀 Starting ActrNode");
371
372 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
373 // 1. Connect to signaling server and register
374 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
375 tracing::info!("📡 Connecting to signaling server");
376 self.signaling_client.connect().await.map_err(|e| {
377 actr_protocol::ProtocolError::TransportError(format!("Signaling connect failed: {e}"))
378 })?;
379 tracing::info!("✅ Connected to signaling server");
380
381 // Get ActrType
382 let actr_type = self.workload.actor_type();
383 tracing::info!("📋 Actor type: {}", actr_type.to_string_repr());
384
385 // Calculate ServiceSpec from config exports
386 let service_spec = self.config.calculate_service_spec();
387 if let Some(ref spec) = service_spec {
388 tracing::info!("📦 Service fingerprint: {}", spec.fingerprint);
389 tracing::info!("📦 Service tags: {:?}", spec.tags);
390 } else {
391 tracing::info!("📦 No proto exports, ServiceSpec is None");
392 }
393
394 // Construct protobuf RegisterRequest
395 let register_request = RegisterRequest {
396 actr_type: actr_type.clone(),
397 realm: self.config.realm,
398 service_spec,
399 acl: self.config.acl.clone(),
400 };
401
402 tracing::info!("📤 Registering actor with signaling server (protobuf)");
403
404 // Use send_register_request to send and wait for response
405 let register_response = self
406 .signaling_client
407 .send_register_request(register_request)
408 .await
409 .map_err(|e| {
410 actr_protocol::ProtocolError::TransportError(format!(
411 "Actor registration failed: {e}"
412 ))
413 })?;
414
415 // Handle RegisterResponse oneof result
416 //
417 // Collect background task handles (including unregister task) so they can be managed
418 // by ActrRefShared later.
419 let mut task_handles = Vec::new();
420
421 match register_response.result {
422 Some(register_response::Result::Success(register_ok)) => {
423 let actor_id = register_ok.actr_id;
424 let credential = register_ok.credential;
425
426 tracing::info!("✅ Registration successful");
427 tracing::info!(
428 "🆔 Assigned ActrId: {}",
429 actr_protocol::ActrIdExt::to_string_repr(&actor_id)
430 );
431 tracing::info!(
432 "🔐 Received credential (token_key_id: {})",
433 credential.token_key_id
434 );
435 tracing::info!(
436 "💓 Signaling heartbeat interval: {} seconds",
437 register_ok.signaling_heartbeat_interval_secs
438 );
439
440 // Log additional information (if available)
441 if register_ok.psk.is_some() {
442 tracing::debug!("🔑 Received PSK (bootstrap keying material)");
443 }
444 if let Some(expires_at) = ®ister_ok.credential_expires_at {
445 tracing::debug!("⏰ Credential expires at: {}s", expires_at.seconds);
446 }
447
448 // Store ActrId and Credential
449 self.actor_id = Some(actor_id.clone());
450 self.credential = Some(credential.clone());
451
452 // Persist identity into ContextFactory for later Context creation
453 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
454 // 1.3. Store references to both inproc managers (already created in ActrSystem::new())
455 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
456 let shell_to_workload = self
457 .context_factory
458 .as_ref()
459 .expect("ContextFactory must exist")
460 .shell_to_workload();
461 let workload_to_shell = self
462 .context_factory
463 .as_ref()
464 .expect("ContextFactory must exist")
465 .workload_to_shell();
466 self.inproc_mgr = Some(shell_to_workload); // Workload receives from this
467 self.workload_to_shell_mgr = Some(workload_to_shell); // Workload sends to this
468
469 tracing::info!(
470 "✅ Inproc infrastructure already ready (created in ActrSystem::new())"
471 );
472
473 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
474 // 1.5. Create WebRTC infrastructure
475 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
476 tracing::info!("🌐 Initializing WebRTC infrastructure");
477
478 // Get MediaFrameRegistry from ContextFactory
479 let media_frame_registry = self
480 .context_factory
481 .as_ref()
482 .expect("ContextFactory must exist")
483 .media_frame_registry
484 .clone();
485
486 // Create WebRtcCoordinator
487 let coordinator =
488 Arc::new(crate::wire::webrtc::coordinator::WebRtcCoordinator::new(
489 actor_id.clone(),
490 credential.clone(),
491 self.signaling_client.clone(),
492 WebRtcConfig::default(),
493 media_frame_registry,
494 ));
495
496 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
497 // 1.6. Create OutprocTransportManager + OutprocOutGate (新架构)
498 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
499 tracing::info!("🏗️ Creating OutprocTransportManager with WebRTC support");
500
501 // Create DefaultWireBuilder with WebRTC coordinator
502 use crate::transport::{DefaultWireBuilder, DefaultWireBuilderConfig};
503 let wire_builder_config = DefaultWireBuilderConfig {
504 websocket_url_template: None, // WebSocket disabled for now
505 enable_webrtc: true,
506 enable_websocket: false,
507 };
508 let wire_builder = Arc::new(DefaultWireBuilder::new(
509 Some(coordinator.clone()),
510 wire_builder_config,
511 ));
512
513 // Create OutprocTransportManager
514 use crate::transport::OutprocTransportManager;
515 let transport_manager =
516 Arc::new(OutprocTransportManager::new(actor_id.clone(), wire_builder));
517
518 // Create OutprocOutGate with WebRTC coordinator for MediaTrack support
519 use crate::outbound::{OutGate, OutprocOutGate};
520 let outproc_gate = Arc::new(OutprocOutGate::new(
521 transport_manager,
522 Some(coordinator.clone()), // Enable MediaTrack support
523 ));
524 let outproc_gate_enum = OutGate::OutprocOut(outproc_gate.clone());
525
526 tracing::info!("✅ OutprocTransportManager + OutprocOutGate initialized");
527
528 // Get DataStreamRegistry from ContextFactory
529 let data_stream_registry = self
530 .context_factory
531 .as_ref()
532 .expect("ContextFactory must exist")
533 .data_stream_registry
534 .clone();
535
536 // Create WebRtcGate with shared pending_requests and DataStreamRegistry
537 let pending_requests = outproc_gate.get_pending_requests();
538 let gate = Arc::new(crate::wire::webrtc::gate::WebRtcGate::new(
539 coordinator.clone(),
540 pending_requests,
541 data_stream_registry,
542 ));
543
544 // Set local_id
545 gate.set_local_id(actor_id.clone()).await;
546
547 tracing::info!(
548 "✅ WebRtcGate created with shared pending_requests and DataStreamRegistry"
549 );
550
551 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
552 // 1.7. Set outproc_gate in ContextFactory (completing initialization)
553 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
554 tracing::info!("🔧 Setting outproc_gate in ContextFactory");
555 self.context_factory
556 .as_mut()
557 .expect("ContextFactory must exist")
558 .set_outproc_gate(outproc_gate_enum);
559
560 tracing::info!(
561 "✅ ContextFactory fully initialized (inproc + outproc gates ready)"
562 );
563
564 // Save references (WebRtcGate kept for backward compatibility if needed)
565 self.webrtc_coordinator = Some(coordinator.clone());
566 self.webrtc_gate = Some(gate.clone());
567
568 tracing::info!("✅ WebRTC infrastructure initialized");
569
570 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
571 // 1.8. Spawn dedicated Unregister task (best-effort, with timeout)
572 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
573 //
574 // This task:
575 // - Waits for shutdown_token to be cancelled (e.g., wait_for_ctrl_c_and_shutdown)
576 // - Then sends UnregisterRequest via signaling client with a timeout
577 //
578 // NOTE: we push its JoinHandle into task_handles so it can be aborted
579 // by ActrRefShared::Drop if needed.
580 {
581 let shutdown = self.shutdown_token.clone();
582 let client = self.signaling_client.clone();
583 let actor_id_for_unreg = actor_id.clone();
584 let credential_for_unreg = credential.clone();
585 let webrtc_coordinator = self.webrtc_coordinator.clone();
586
587 let unregister_handle = tokio::spawn(async move {
588 // Wait for shutdown signal
589 shutdown.cancelled().await;
590 tracing::info!(
591 "📡 Shutdown signal received2, sending UnregisterRequest for Actor {:?}",
592 actor_id_for_unreg
593 );
594
595 // 1. 先关闭所有 WebRTC peer 连接(如果存在)
596 if let Some(coord) = webrtc_coordinator {
597 if let Err(e) = coord.close_all_peers().await {
598 tracing::warn!(
599 "⚠️ Failed to close all WebRTC peers before UnregisterRequest: {}",
600 e
601 );
602 } else {
603 tracing::info!(
604 "✅ All WebRTC peers closed before UnregisterRequest"
605 );
606 }
607 } else {
608 tracing::debug!(
609 "WebRTC coordinator not found before UnregisterRequest (no WebRTC?)"
610 );
611 }
612
613 // 2. 再发送 UnregisterRequest,设置一个超时(例如 5 秒)
614 let result = tokio::time::timeout(
615 std::time::Duration::from_secs(5),
616 client.send_unregister_request(
617 actor_id_for_unreg.clone(),
618 credential_for_unreg.clone(),
619 Some("Graceful shutdown".to_string()),
620 ),
621 )
622 .await;
623 tracing::info!("UnregisterRequest result: {:?}", result);
624 match result {
625 Ok(Ok(_)) => {
626 tracing::info!(
627 "✅ UnregisterRequest sent to signaling server for Actor {:?}",
628 actor_id_for_unreg
629 );
630 }
631 Ok(Err(e)) => {
632 tracing::warn!(
633 "⚠️ Failed to send UnregisterRequest for Actor {:?}: {}",
634 actor_id_for_unreg,
635 e
636 );
637 }
638 Err(_) => {
639 tracing::warn!(
640 "⚠️ UnregisterRequest timeout (5s) for Actor {:?}",
641 actor_id_for_unreg
642 );
643 }
644 }
645 });
646
647 task_handles.push(unregister_handle);
648 }
649
650 // Spawn signaling auto-reconnect helper that reacts immediately to disconnect events.
651 crate::wire::webrtc::spawn_signaling_reconnector(
652 self.signaling_client.clone(),
653 self.shutdown_token.clone(),
654 );
655 }
656 Some(register_response::Result::Error(error)) => {
657 tracing::error!(
658 severity = 10,
659 error_category = "registration_error",
660 error_code = error.code,
661 "❌ Registration failed: code={}, message={}",
662 error.code,
663 error.message
664 );
665 return Err(actr_protocol::ProtocolError::TransportError(format!(
666 "Registration rejected: {} (code: {})",
667 error.message, error.code
668 )));
669 }
670 None => {
671 tracing::error!(
672 severity = 10,
673 error_category = "registration_error",
674 "❌ Registration response missing result"
675 );
676 return Err(actr_protocol::ProtocolError::TransportError(
677 "Invalid registration response: missing result".to_string(),
678 ));
679 }
680 }
681
682 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
683 // 2. Transport layer initialization (completed via WebRTC infrastructure)
684 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
685 tracing::info!("✅ Transport layer initialized via WebRTC infrastructure");
686
687 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
688 // 3.1 Convert to Arc (before starting background loops)
689 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
690 // Clone actor_id before moving self into Arc
691 let actor_id = self
692 .actor_id
693 .as_ref()
694 .ok_or_else(|| {
695 actr_protocol::ProtocolError::InvalidStateTransition(
696 "Actor ID not set - registration must complete before starting node"
697 .to_string(),
698 )
699 })?
700 .clone();
701 let credential = self
702 .credential
703 .as_ref()
704 .ok_or_else(|| {
705 actr_protocol::ProtocolError::InvalidStateTransition(
706 "Credential not set - node must be started before handling messages"
707 .to_string(),
708 )
709 })?
710 .clone();
711
712 let actor_id_for_shell = actor_id.clone();
713 let shutdown_token = self.shutdown_token.clone();
714 let node_ref = Arc::new(self);
715
716 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
717 // 3.5. Start WebRTC background loops (BEFORE on_start)
718 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
719 // CRITICAL: Start signaling loop before on_start() to avoid deadlock
720 // where on_start() tries to send messages but signaling loop isn't running
721 tracing::info!("🚀 Starting WebRTC background loops");
722
723 // Start WebRtcCoordinator signaling loop
724 if let Some(coordinator) = &node_ref.webrtc_coordinator {
725 coordinator.clone().start().await.map_err(|e| {
726 actr_protocol::ProtocolError::TransportError(format!(
727 "WebRtcCoordinator start failed: {e}"
728 ))
729 })?;
730 tracing::info!("✅ WebRtcCoordinator signaling loop started");
731 }
732
733 // Start WebRtcGate message receive loop (route to Mailbox)
734 if let Some(gate) = &node_ref.webrtc_gate {
735 gate.start_receive_loop(node_ref.mailbox.clone())
736 .await
737 .map_err(|e| {
738 actr_protocol::ProtocolError::TransportError(format!(
739 "WebRtcGate receive loop start failed: {e}"
740 ))
741 })?;
742 tracing::info!("✅ WebRtcGate → Mailbox routing started");
743 }
744
745 tracing::info!("✅ WebRTC background loops started");
746
747 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
748 // 4. Call lifecycle hook on_start (AFTER WebRTC loops are running)
749 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
750 tracing::info!("🪝 Calling lifecycle hook: on_start");
751
752 let ctx = node_ref
753 .context_factory
754 .as_ref()
755 .expect("ContextFactory must be initialized before on_start")
756 .create(
757 &actor_id,
758 None, // caller_id
759 "bootstrap", // trace_id
760 "bootstrap", // request_id
761 &credential,
762 );
763 node_ref.workload.on_start(&ctx).await?;
764 tracing::info!("✅ Lifecycle hook on_start completed");
765
766 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
767 // 4.6. Start Inproc receive loop (Shell → Workload)
768 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
769 tracing::info!("🔄 Starting Inproc receive loop (Shell → Workload)");
770 // Start Workload receive loop (Shell → Workload REQUEST)
771 if let Some(shell_to_workload) = &node_ref.inproc_mgr {
772 if let Some(workload_to_shell) = &node_ref.workload_to_shell_mgr {
773 let node = node_ref.clone();
774 let request_rx_lane = shell_to_workload
775 .get_lane(actr_protocol::PayloadType::RpcReliable, None)
776 .await
777 .map_err(|e| {
778 actr_protocol::ProtocolError::TransportError(format!(
779 "Failed to get Workload receive lane: {e}"
780 ))
781 })?;
782 let response_tx = workload_to_shell.clone();
783 let shutdown = shutdown_token.clone();
784
785 let inproc_handle = tokio::spawn(async move {
786 loop {
787 tokio::select! {
788 _ = shutdown.cancelled() => {
789 tracing::info!("📭 Workload receive loop (Shell → Workload) received shutdown signal");
790 break;
791 }
792
793 envelope_result = request_rx_lane.recv_envelope() => {
794 match envelope_result {
795 Ok(envelope) => {
796 let request_id = envelope.request_id.clone();
797 tracing::debug!("📨 Workload received REQUEST from Shell: request_id={}", request_id);
798
799 // Shell calls have no caller_id (local process communication)
800 match node.handle_incoming(envelope.clone(), None).await {
801 Ok(response_bytes) => {
802 // Send RESPONSE back via workload_to_shell
803 // Keep same route_key (no prefix needed - separate channels!)
804 let response_envelope = RpcEnvelope {
805 route_key: envelope.route_key.clone(),
806 payload: Some(response_bytes),
807 error: None,
808 trace_id: envelope.trace_id.clone(),
809 request_id: request_id.clone(),
810 metadata: Vec::new(),
811 timeout_ms: 30000,
812 };
813
814 // Send via Workload → Shell channel
815 if let Err(e) = response_tx.send_message(PayloadType::RpcReliable, None, response_envelope).await {
816 tracing::error!(
817 severity = 7,
818 error_category = "transport_error",
819 trace_id = %envelope.trace_id,
820 request_id = %request_id,
821 "❌ Failed to send RESPONSE to Shell: {:?}", e
822 );
823 }
824 }
825 Err(e) => {
826 tracing::error!(
827 severity = 6,
828 error_category = "handler_error",
829 trace_id = %envelope.trace_id,
830 request_id = %request_id,
831 route_key = %envelope.route_key,
832 "❌ Workload message handling failed: {:?}", e
833 );
834
835 // Send error response (system-level error on envelope)
836 let error_response = actr_protocol::ErrorResponse {
837 code: protocol_error_to_code(&e),
838 message: e.to_string(),
839 };
840
841 let error_envelope = RpcEnvelope {
842 route_key: envelope.route_key.clone(),
843 payload: None,
844 error: Some(error_response),
845 trace_id: envelope.trace_id.clone(),
846 request_id: request_id.clone(),
847 metadata: Vec::new(),
848 timeout_ms: 30000,
849 };
850
851 if let Err(e) = response_tx.send_message(PayloadType::RpcReliable, None, error_envelope).await {
852 tracing::error!(
853 severity = 7,
854 error_category = "transport_error",
855 trace_id = %envelope.trace_id,
856 request_id = %request_id,
857 "❌ Failed to send ERROR response to Shell: {:?}", e
858 );
859 }
860 }
861 }
862 }
863 Err(e) => {
864 tracing::error!(
865 severity = 8,
866 error_category = "transport_error",
867 "❌ Failed to receive from Shell → Workload lane: {:?}", e
868 );
869 break;
870 }
871 }
872 }
873 }
874 }
875 tracing::info!(
876 "✅ Workload receive loop (Shell → Workload) terminated gracefully"
877 );
878 });
879
880 task_handles.push(inproc_handle);
881 }
882 }
883 tracing::info!("✅ Workload receive loop (Shell → Workload REQUEST) started");
884
885 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
886 // 4.7. Start Shell receive loop (Workload → Shell RESPONSE)
887 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
888 tracing::info!("🔄 Starting Shell receive loop (Workload → Shell RESPONSE)");
889 if let Some(workload_to_shell) = &node_ref.workload_to_shell_mgr {
890 if let Some(shell_to_workload) = &node_ref.inproc_mgr {
891 let response_rx_lane = workload_to_shell
892 .get_lane(actr_protocol::PayloadType::RpcReliable, None)
893 .await
894 .map_err(|e| {
895 actr_protocol::ProtocolError::TransportError(format!(
896 "Failed to get Shell receive lane: {e}"
897 ))
898 })?;
899 let request_mgr = shell_to_workload.clone();
900 let shutdown = shutdown_token.clone();
901
902 let shell_receive_handle = tokio::spawn(async move {
903 loop {
904 tokio::select! {
905 _ = shutdown.cancelled() => {
906 tracing::info!("📭 Shell receive loop (Workload → Shell) received shutdown signal");
907 break;
908 }
909
910 envelope_result = response_rx_lane.recv_envelope() => {
911 match envelope_result {
912 Ok(envelope) => {
913 tracing::debug!("📨 Shell received RESPONSE from Workload: request_id={}", envelope.request_id);
914
915 // Check if response is success or error
916 match (envelope.payload, envelope.error) {
917 (Some(payload), None) => {
918 // Success response
919 if let Err(e) = request_mgr.complete_response(&envelope.request_id, payload).await {
920 tracing::warn!(
921 severity = 4,
922 error_category = "orphan_response",
923 trace_id = %envelope.trace_id,
924 request_id = %envelope.request_id,
925 "⚠️ No pending request found for response: {:?}", e
926 );
927 }
928 }
929 (None, Some(error)) => {
930 // Error response - convert to ProtocolError and complete with error
931 let protocol_err = actr_protocol::ProtocolError::TransportError(
932 format!("RPC error {}: {}", error.code, error.message)
933 );
934 if let Err(e) = request_mgr.complete_error(&envelope.request_id, protocol_err).await {
935 tracing::warn!(
936 severity = 4,
937 error_category = "orphan_response",
938 trace_id = %envelope.trace_id,
939 request_id = %envelope.request_id,
940 "⚠️ No pending request found for error response: {:?}", e
941 );
942 }
943 }
944 _ => {
945 tracing::error!(
946 severity = 7,
947 error_category = "protocol_error",
948 trace_id = %envelope.trace_id,
949 request_id = %envelope.request_id,
950 "❌ Invalid RpcEnvelope: both payload and error are present or both absent"
951 );
952 }
953 }
954 }
955 Err(e) => {
956 tracing::error!(
957 severity = 8,
958 error_category = "transport_error",
959 "❌ Failed to receive from Workload → Shell lane: {:?}", e
960 );
961 break;
962 }
963 }
964 }
965 }
966 }
967 tracing::info!(
968 "✅ Shell receive loop (Workload → Shell) terminated gracefully"
969 );
970 });
971
972 task_handles.push(shell_receive_handle);
973 }
974 }
975 tracing::info!("✅ Shell receive loop (Workload → Shell RESPONSE) started");
976
977 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
978 // 5. Start Mailbox processing loop (State Path)
979 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
980 tracing::info!("🔄 Starting Mailbox processing loop (State Path)");
981 {
982 let node = node_ref.clone();
983 let mailbox = node_ref.mailbox.clone();
984 let gate = node_ref.webrtc_gate.clone();
985 let shutdown = shutdown_token.clone();
986
987 let mailbox_handle = tokio::spawn(async move {
988 loop {
989 tokio::select! {
990 // Listen for shutdown signal
991 _ = shutdown.cancelled() => {
992 tracing::info!("📭 Mailbox loop received shutdown signal");
993 break;
994 }
995
996 // Dequeue messages (by priority)
997 result = mailbox.dequeue() => {
998 match result {
999 Ok(messages) => {
1000 if messages.is_empty() {
1001 // Queue empty, sleep briefly
1002 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1003 continue;
1004 }
1005
1006 tracing::debug!("📬 Mailbox dequeue: {} messages", messages.len());
1007
1008 // Process messages one by one
1009 for msg_record in messages {
1010 // Deserialize RpcEnvelope (Protobuf)
1011 match RpcEnvelope::decode(&msg_record.payload[..]) {
1012 Ok(envelope) => {
1013 let request_id = envelope.request_id.clone();
1014 tracing::debug!("📦 Processing message: request_id={}", request_id);
1015
1016 // Decode caller_id from MessageRecord.from (transport layer)
1017 let caller_id_result = ActrId::decode(&msg_record.from[..]);
1018 let caller_id_ref = caller_id_result.as_ref().ok();
1019
1020 if caller_id_ref.is_none() {
1021 tracing::warn!(
1022 trace_id = %envelope.trace_id,
1023 request_id = %request_id,
1024 "⚠️ Failed to decode caller_id from MessageRecord.from"
1025 );
1026 }
1027
1028 // Call handle_incoming with caller_id from transport layer
1029 match node.handle_incoming(envelope.clone(), caller_id_ref).await {
1030 Ok(response_bytes) => {
1031 // Send response (reuse request_id)
1032 if let Some(ref gate) = gate {
1033 // Use already decoded caller_id
1034 match caller_id_result {
1035 Ok(caller) => {
1036 // Construct response RpcEnvelope (reuse request_id!)
1037 let response_envelope = RpcEnvelope {
1038 request_id, // Reuse!
1039 route_key: envelope.route_key.clone(),
1040 payload: Some(response_bytes),
1041 error: None,
1042 trace_id: envelope.trace_id.clone(),
1043 metadata: Vec::new(), // Response doesn't need extra metadata
1044 timeout_ms: 30000,
1045 };
1046
1047 if let Err(e) = gate.send_response(&caller, response_envelope).await {
1048 tracing::error!(
1049 severity = 7,
1050 error_category = "transport_error",
1051 trace_id = %envelope.trace_id,
1052 request_id = %envelope.request_id,
1053 "❌ Failed to send response: {:?}", e
1054 );
1055 }
1056 }
1057 Err(e) => {
1058 tracing::error!(
1059 severity = 8,
1060 error_category = "protobuf_decode",
1061 trace_id = %envelope.trace_id,
1062 request_id = %envelope.request_id,
1063 "❌ Failed to decode caller_id: {:?}", e
1064 );
1065 }
1066 }
1067 }
1068
1069 // ACK message
1070 if let Err(e) = mailbox.ack(msg_record.id).await {
1071 tracing::error!(
1072 severity = 9,
1073 error_category = "mailbox_error",
1074 trace_id = %envelope.trace_id,
1075 request_id = %envelope.request_id,
1076 message_id = %msg_record.id,
1077 "❌ Mailbox ACK failed: {:?}", e
1078 );
1079 }
1080 }
1081 Err(e) => {
1082 tracing::error!(
1083 severity = 6,
1084 error_category = "handler_error",
1085 trace_id = %envelope.trace_id,
1086 request_id = %envelope.request_id,
1087 route_key = %envelope.route_key,
1088 "❌ handle_incoming failed: {:?}", e
1089 );
1090 // ACK to avoid infinite retries
1091 // Application errors are caller's responsibility
1092 let _ = mailbox.ack(msg_record.id).await;
1093 }
1094 }
1095 }
1096 Err(e) => {
1097 // Poison message - cannot decode RpcEnvelope
1098 tracing::error!(
1099 severity = 9,
1100 error_category = "protobuf_decode",
1101 message_id = %msg_record.id,
1102 "❌ Poison message: Failed to deserialize RpcEnvelope: {:?}", e
1103 );
1104
1105 // Write to Dead Letter Queue
1106 use actr_mailbox::DlqRecord;
1107 use chrono::Utc;
1108 use uuid::Uuid;
1109
1110 let dlq_record = DlqRecord {
1111 id: Uuid::new_v4(),
1112 original_message_id: Some(msg_record.id.to_string()),
1113 from: Some(msg_record.from.clone()),
1114 to: node.actor_id.as_ref().map(|id| {
1115 let mut buf = Vec::new();
1116 id.encode(&mut buf).unwrap();
1117 buf
1118 }),
1119 raw_bytes: msg_record.payload.clone(),
1120 error_message: format!("Protobuf decode failed: {e}"),
1121 error_category: "protobuf_decode".to_string(),
1122 trace_id: format!("mailbox-{}", msg_record.id), // Fallback trace_id
1123 request_id: None,
1124 created_at: Utc::now(),
1125 redrive_attempts: 0,
1126 last_redrive_at: None,
1127 context: Some(format!(
1128 r#"{{"source":"mailbox","priority":"{}"}}"#,
1129 match msg_record.priority {
1130 actr_mailbox::MessagePriority::High => "high",
1131 actr_mailbox::MessagePriority::Normal => "normal",
1132 }
1133 )),
1134 };
1135
1136 if let Err(dlq_err) = node.dlq.enqueue(dlq_record).await {
1137 tracing::error!(
1138 severity = 10,
1139 "❌ CRITICAL: Failed to write poison message to DLQ: {:?}", dlq_err
1140 );
1141 } else {
1142 tracing::warn!(
1143 severity = 9,
1144 "☠️ Poison message moved to DLQ: message_id={}", msg_record.id
1145 );
1146 }
1147
1148 // ACK the poison message to remove from mailbox
1149 let _ = mailbox.ack(msg_record.id).await;
1150 }
1151 }
1152 }
1153 }
1154 Err(e) => {
1155 tracing::error!(
1156 severity = 9,
1157 error_category = "mailbox_error",
1158 "❌ Mailbox dequeue failed: {:?}", e
1159 );
1160 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1161 }
1162 }
1163 }
1164 }
1165 }
1166 tracing::info!("✅ Mailbox processing loop terminated gracefully");
1167 });
1168
1169 task_handles.push(mailbox_handle);
1170 }
1171 tracing::info!("✅ Mailbox processing loop started");
1172
1173 tracing::info!("✅ ActrNode started successfully");
1174
1175 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1176 // 6. Create ActrRef for Shell to interact with Workload
1177 // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1178 use crate::actr_ref::{ActrRef, ActrRefShared};
1179 use crate::outbound::InprocOutGate;
1180
1181 // Create InprocOutGate from shell_to_workload transport manager
1182 let shell_to_workload = node_ref
1183 .inproc_mgr
1184 .clone()
1185 .expect("inproc_mgr must be initialized");
1186 let inproc_gate = Arc::new(InprocOutGate::new(shell_to_workload));
1187
1188 // Create ActrRefShared
1189 let actr_ref_shared = Arc::new(ActrRefShared {
1190 actor_id: actor_id_for_shell.clone(),
1191 inproc_gate,
1192 shutdown_token: shutdown_token.clone(),
1193 task_handles: tokio::sync::Mutex::new(task_handles),
1194 });
1195
1196 // Create ActrRef
1197 let actr_ref = ActrRef::new(actr_ref_shared, node_ref);
1198
1199 tracing::info!("✅ ActrRef created (Shell → Workload communication handle)");
1200
1201 Ok(actr_ref)
1202 }
1203}