actr_runtime/lifecycle/
actr_node.rs

1//! ActrNode - ActrSystem + Workload (1:1 composition)
2
3use actr_framework::{Bytes, Workload};
4use actr_protocol::prost::Message as ProstMessage;
5use actr_protocol::{
6    ActorResult, ActrId, ActrType, PayloadType, RouteCandidatesRequest, RpcEnvelope,
7    route_candidates_request,
8};
9use futures_util::FutureExt;
10use std::sync::Arc;
11use tokio_util::sync::CancellationToken;
12use tracing::Instrument as _;
13
14use crate::context_factory::ContextFactory;
15use crate::transport::InprocTransportManager;
16
17// Use types from sub-crates
18use crate::wire::webrtc::{SignalingClient, WebRtcConfig};
19use actr_mailbox::{DeadLetterQueue, Mailbox};
20use actr_protocol::{AIdCredential, RegisterRequest, register_response};
21
22// Use extension traits from actr-protocol
23use actr_protocol::{ActrIdExt, ActrTypeExt};
24
25/// ActrNode - ActrSystem + Workload (1:1 composition)
26///
27/// # Generic Parameters
28/// - `W`: Workload type
29///
30/// # MessageDispatcher Association
31/// - Statically associated via W::Dispatcher
32/// - Does not store Dispatcher instance (not even ZST needed)
33/// - Dispatch calls entirely through type system
34pub struct ActrNode<W: Workload> {
35    /// Runtime configuration
36    pub(crate) config: actr_config::Config,
37
38    /// Workload instance (the only business logic)
39    pub(crate) workload: Arc<W>,
40
41    /// SQLite persistent mailbox
42    pub(crate) mailbox: Arc<dyn Mailbox>,
43
44    /// Dead Letter Queue for poison messages
45    pub(crate) dlq: Arc<dyn DeadLetterQueue>,
46
47    /// Context factory (created in start() after obtaining ActorId)
48    pub(crate) context_factory: Option<ContextFactory>,
49
50    /// Signaling client
51    pub(crate) signaling_client: Arc<dyn SignalingClient>,
52
53    /// Actor ID (obtained after startup)
54    pub(crate) actor_id: Option<ActrId>,
55
56    /// Actor Credential (obtained after startup, used for subsequent authentication messages)
57    pub(crate) credential: Option<AIdCredential>,
58
59    /// WebRTC coordinator (created after startup)
60    pub(crate) webrtc_coordinator: Option<Arc<crate::wire::webrtc::coordinator::WebRtcCoordinator>>,
61
62    /// WebRTC Gate (created after startup)
63    pub(crate) webrtc_gate: Option<Arc<crate::wire::webrtc::gate::WebRtcGate>>,
64
65    /// Shell → Workload Transport Manager
66    ///
67    /// Workload receives REQUEST from Shell (zero serialization, direct RpcEnvelope passing)
68    pub(crate) inproc_mgr: Option<Arc<InprocTransportManager>>,
69
70    /// Workload → Shell Transport Manager
71    ///
72    /// Workload sends RESPONSE to Shell (separate pending_requests from Shell's)
73    pub(crate) workload_to_shell_mgr: Option<Arc<InprocTransportManager>>,
74
75    /// Shutdown token for graceful shutdown
76    pub(crate) shutdown_token: CancellationToken,
77}
78
79/// Map ProtocolError to error code for ErrorResponse
80fn protocol_error_to_code(err: &actr_protocol::ProtocolError) -> u32 {
81    use actr_protocol::ProtocolError;
82    match err {
83        ProtocolError::Actr(_) => 400, // Bad Request - identity/decode error
84        ProtocolError::Uri(_) => 400,  // Bad Request - URI parsing error
85        ProtocolError::Name(_) => 400, // Bad Request - invalid name
86        ProtocolError::SerializationError(_) => 500, // Internal Server Error
87        ProtocolError::DeserializationError(_) => 400, // Bad Request - invalid payload
88        ProtocolError::DecodeError(_) => 400, // Bad Request - decode failure
89        ProtocolError::EncodeError(_) => 500, // Internal Server Error
90        ProtocolError::UnknownRoute(_) => 404, // Not Found - route not found
91        ProtocolError::TransportError(_) => 503, // Service Unavailable
92        ProtocolError::Timeout => 504, // Gateway Timeout
93        ProtocolError::TargetNotFound(_) => 404, // Not Found
94        ProtocolError::TargetUnavailable(_) => 503, // Service Unavailable
95        ProtocolError::InvalidStateTransition(_) => 500, // Internal Server Error
96    }
97}
98
99impl<W: Workload> ActrNode<W> {
100    /// Get Inproc Transport Manager
101    ///
102    /// # Returns
103    /// - `Some(Arc<InprocTransportManager>)`: Initialized manager
104    /// - `None`: Not yet started (need to call start() first)
105    ///
106    /// # Use Cases
107    /// - Workload internals need to communicate with Shell
108    /// - Create custom LatencyFirst/MediaTrack channels
109    pub fn inproc_mgr(&self) -> Option<Arc<InprocTransportManager>> {
110        self.inproc_mgr.clone()
111    }
112
113    /// Get ActorId (if registration has completed)
114    pub fn actor_id(&self) -> Option<&ActrId> {
115        self.actor_id.as_ref()
116    }
117
118    /// Get credential (if registration has completed)
119    pub fn credential(&self) -> Option<&AIdCredential> {
120        self.credential.as_ref()
121    }
122
123    /// Get signaling client (for manual control such as UnregisterRequest)
124    pub fn signaling_client(&self) -> Arc<dyn SignalingClient> {
125        self.signaling_client.clone()
126    }
127
128    /// Get shutdown token for this node
129    pub fn shutdown_token(&self) -> CancellationToken {
130        self.shutdown_token.clone()
131    }
132
133    /// Discover remote actors of the specified type via signaling server.
134    ///
135    /// This requests best route candidates via `RouteCandidatesRequest` using the node's existing registration.
136    ///
137    /// The method returns the ordered list of candidate `ActrId`s reported by the signaling server.
138    ///
139    /// # Errors
140    /// - Returns `InvalidStateTransition` if the node is not started (no actor_id/credential).
141    ///   The node must be started via `start()` before calling this method.
142    /// - Returns `TransportError` if the signaling client is not connected.
143    pub async fn discover_route_candidates(
144        &self,
145        target_type: &ActrType,
146        candidate_count: u32,
147    ) -> ActorResult<Vec<ActrId>> {
148        // Check if node is started (has actor_id and credential)
149        let actor_id = self.actor_id.as_ref().ok_or_else(|| {
150            actr_protocol::ProtocolError::InvalidStateTransition(
151                "Node is not started. Call start() first.".to_string(),
152            )
153        })?;
154
155        let credential = self.credential.as_ref().ok_or_else(|| {
156            actr_protocol::ProtocolError::InvalidStateTransition(
157                "Node is not started. Call start() first.".to_string(),
158            )
159        })?;
160
161        // Check if the signaling client is connected
162        if !self.signaling_client.is_connected() {
163            return Err(actr_protocol::ProtocolError::TransportError(
164                "Signaling client is not connected.".to_string(),
165            ));
166        }
167
168        let client = self.signaling_client.as_ref();
169
170        let criteria = route_candidates_request::NodeSelectionCriteria {
171            candidate_count,
172            ranking_factors: Vec::new(),
173            minimal_dependency_requirement: None,
174            minimal_health_requirement: None,
175        };
176
177        let route_request = RouteCandidatesRequest {
178            target_type: target_type.clone(),
179            criteria: Some(criteria),
180            client_location: None,
181        };
182
183        let route_response = client
184            .send_route_candidates_request(actor_id.clone(), credential.clone(), route_request)
185            .await
186            .map_err(|e| {
187                actr_protocol::ProtocolError::TransportError(format!(
188                    "Route candidates request failed: {e}"
189                ))
190            })?;
191
192        match route_response.result {
193            Some(actr_protocol::route_candidates_response::Result::Success(success)) => {
194                Ok(success.candidates)
195            }
196            Some(actr_protocol::route_candidates_response::Result::Error(err)) => {
197                Err(actr_protocol::ProtocolError::TransportError(format!(
198                    "Route candidates error {}: {}",
199                    err.code, err.message
200                )))
201            }
202            None => Err(actr_protocol::ProtocolError::TransportError(
203                "Invalid route candidates response: missing result".to_string(),
204            )),
205        }
206    }
207
208    /// Handle incoming message envelope
209    ///
210    /// # Performance Analysis
211    /// 1. create_context: ~10ns
212    /// 2. W::Dispatcher::dispatch: ~5-10ns (static match, can be inlined)
213    /// 3. User business logic: variable
214    ///
215    /// Framework overhead: ~15-20ns (compared to 50-100ns in traditional approaches)
216    ///
217    /// # Zero-cost Abstraction
218    /// - Compiler can inline entire call chain
219    /// - Match branches can be directly expanded
220    /// - Final generated code approaches hand-written match expression
221    ///
222    /// # Parameters
223    /// - `envelope`: The RPC envelope containing the message
224    /// - `caller_id`: The ActrId of the caller (from transport layer, None for local Shell calls)
225    ///
226    /// # caller_id Design
227    ///
228    /// **Why not in RpcEnvelope?**
229    /// - Transport layer (WebRTC/Mailbox) already knows the sender
230    /// - All connections are direct P2P (no intermediaries)
231    /// - Storing in envelope would be redundant duplication
232    ///
233    /// **How it works:**
234    /// - WebRTC/Mailbox stores sender in `MessageRecord.from` (Protobuf bytes)
235    /// - Only decoded when creating Context (once per message)
236    /// - Shell calls pass `None` (local process, no remote caller)
237    /// - Remote calls decode from `MessageRecord.from`
238    ///
239    /// **trace_id vs request_id:**
240    /// - `trace_id`: Distributed tracing across entire call chain (A → B → C)
241    /// - `request_id`: Unique identifier for each request-response pair
242    /// - Both kept for flexibility in complex scenarios
243    /// - Single-hop calls: effectively identical
244    /// - Multi-hop calls: trace_id spans all hops, request_id per hop
245    pub async fn handle_incoming(
246        &self,
247        envelope: RpcEnvelope,
248        caller_id: Option<&ActrId>,
249    ) -> ActorResult<Bytes> {
250        use actr_framework::MessageDispatcher;
251
252        // Log received message
253        if let Some(caller) = caller_id {
254            tracing::debug!(
255                "📨 Handling incoming message: route_key={}, caller={}, request_id={}",
256                envelope.route_key,
257                caller.to_string_repr(),
258                envelope.request_id
259            );
260        } else {
261            tracing::debug!(
262                "📨 Handling incoming message: route_key={}, request_id={}",
263                envelope.route_key,
264                envelope.request_id
265            );
266        }
267
268        // 1. Create Context with caller_id from transport layer
269        let actor_id = self.actor_id.as_ref().ok_or_else(|| {
270            actr_protocol::ProtocolError::InvalidStateTransition(
271                "Actor ID not set - node must be started before handling messages".to_string(),
272            )
273        })?;
274        let credential = self.credential.as_ref().ok_or_else(|| {
275            actr_protocol::ProtocolError::InvalidStateTransition(
276                "Credential not set - node must be started before handling messages".to_string(),
277            )
278        })?;
279
280        let ctx = self
281            .context_factory
282            .as_ref()
283            .expect("ContextFactory must be initialized in start()")
284            .create(
285                actor_id,
286                caller_id, // caller_id from transport layer (MessageRecord.from)
287                &envelope.request_id,
288                credential,
289            );
290
291        // 2. Static MessageRouter dispatch (zero-cost abstraction)
292        // Compiler will inline entire call chain, generating code close to hand-written match
293        //
294        // Wrap dispatch in panic catching to prevent handler panics from crashing the runtime
295        let result = std::panic::AssertUnwindSafe(W::Dispatcher::dispatch(
296            &self.workload,
297            envelope.clone(),
298            &ctx,
299        ))
300        .catch_unwind()
301        .await;
302
303        let result = match result {
304            Ok(handler_result) => handler_result,
305            Err(panic_payload) => {
306                // Handler panicked - extract panic info
307                let panic_info = if let Some(s) = panic_payload.downcast_ref::<&str>() {
308                    s.to_string()
309                } else if let Some(s) = panic_payload.downcast_ref::<String>() {
310                    s.clone()
311                } else {
312                    "Unknown panic payload".to_string()
313                };
314
315                tracing::error!(
316                    severity = 8,
317                    error_category = "handler_panic",
318                    route_key = envelope.route_key,
319                    request_id = %envelope.request_id,
320                    "❌ Handler panicked: {}",
321                    panic_info
322                );
323
324                // Return DecodeFailure error with panic info
325                // (using DecodeFailure as a proxy for "cannot process message")
326                Err(actr_protocol::ProtocolError::Actr(
327                    actr_protocol::ActrError::DecodeFailure {
328                        message: format!("Handler panicked: {panic_info}"),
329                    },
330                ))
331            }
332        };
333
334        // 3. Log result
335        match &result {
336            Ok(_) => tracing::debug!(
337                request_id = %envelope.request_id,
338                route_key = %envelope.route_key,
339                "✅ Message handled successfully"
340            ),
341            Err(e) => tracing::error!(
342                severity = 6,
343                error_category = "handler_error",
344                request_id = %envelope.request_id,
345                route_key = %envelope.route_key,
346                "❌ Message handling failed: {:?}", e
347            ),
348        }
349
350        result
351    }
352
353    /// Start the system
354    ///
355    /// # Startup Sequence
356    /// 1. Connect to signaling server and register Actor
357    /// 2. Initialize transport layer (WebRTC)
358    /// 3. Call lifecycle hook on_start (if Lifecycle trait is implemented)
359    /// 4. Start Mailbox processing loop (State Path serial processing)
360    /// 5. Start Transport (begin receiving messages)
361    /// 6. Create ActrRef for Shell to interact with Workload
362    ///
363    /// # Returns
364    /// - `ActrRef<W>`: Lightweight reference for Shell to call Workload methods
365    pub async fn start(mut self) -> ActorResult<crate::actr_ref::ActrRef<W>> {
366        tracing::info!("🚀 Starting ActrNode");
367
368        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
369        // 1. Connect to signaling server and register
370        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
371        tracing::info!("📡 Connecting to signaling server");
372        self.signaling_client.connect().await.map_err(|e| {
373            actr_protocol::ProtocolError::TransportError(format!("Signaling connect failed: {e}"))
374        })?;
375        tracing::info!("✅ Connected to signaling server");
376
377        // Get ActrType
378        let actr_type = self.workload.actor_type();
379        tracing::info!("📋 Actor type: {}", actr_type.to_string_repr());
380
381        // Calculate ServiceSpec from config exports
382        let service_spec = self.config.calculate_service_spec();
383        if let Some(ref spec) = service_spec {
384            tracing::info!("📦 Service fingerprint: {}", spec.fingerprint);
385            tracing::info!("📦 Service tags: {:?}", spec.tags);
386        } else {
387            tracing::info!("📦 No proto exports, ServiceSpec is None");
388        }
389
390        // Construct protobuf RegisterRequest
391        let register_request = RegisterRequest {
392            actr_type: actr_type.clone(),
393            realm: self.config.realm,
394            service_spec,
395            acl: self.config.acl.clone(),
396        };
397
398        tracing::info!("📤 Registering actor with signaling server (protobuf)");
399
400        // Use send_register_request to send and wait for response
401        let register_response = self
402            .signaling_client
403            .send_register_request(register_request)
404            .await
405            .map_err(|e| {
406                actr_protocol::ProtocolError::TransportError(format!(
407                    "Actor registration failed: {e}"
408                ))
409            })?;
410
411        // Handle RegisterResponse oneof result
412        //
413        // Collect background task handles (including unregister task) so they can be managed
414        // by ActrRefShared later.
415        let mut task_handles = Vec::new();
416
417        match register_response.result {
418            Some(register_response::Result::Success(register_ok)) => {
419                let actor_id = register_ok.actr_id;
420                let credential = register_ok.credential;
421
422                tracing::info!("✅ Registration successful");
423                tracing::info!(
424                    "🆔 Assigned ActrId: {}",
425                    actr_protocol::ActrIdExt::to_string_repr(&actor_id)
426                );
427                tracing::info!(
428                    "🔐 Received credential (token_key_id: {})",
429                    credential.token_key_id
430                );
431                tracing::info!(
432                    "💓 Signaling heartbeat interval: {} seconds",
433                    register_ok.signaling_heartbeat_interval_secs
434                );
435
436                // Log additional information (if available)
437                if register_ok.psk.is_some() {
438                    tracing::debug!("🔑 Received PSK (bootstrap keying material)");
439                }
440                if let Some(expires_at) = &register_ok.credential_expires_at {
441                    tracing::debug!("⏰ Credential expires at: {}s", expires_at.seconds);
442                }
443
444                // Store ActrId and Credential
445                self.actor_id = Some(actor_id.clone());
446                self.credential = Some(credential.clone());
447
448                // Persist identity into ContextFactory for later Context creation
449                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
450                // 1.3. Store references to both inproc managers (already created in ActrSystem::new())
451                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
452                let shell_to_workload = self
453                    .context_factory
454                    .as_ref()
455                    .expect("ContextFactory must exist")
456                    .shell_to_workload();
457                let workload_to_shell = self
458                    .context_factory
459                    .as_ref()
460                    .expect("ContextFactory must exist")
461                    .workload_to_shell();
462                self.inproc_mgr = Some(shell_to_workload); // Workload receives from this
463                self.workload_to_shell_mgr = Some(workload_to_shell); // Workload sends to this
464
465                tracing::info!(
466                    "✅ Inproc infrastructure already ready (created in ActrSystem::new())"
467                );
468
469                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
470                // 1.5. Create WebRTC infrastructure
471                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
472                tracing::info!("🌐 Initializing WebRTC infrastructure");
473
474                // Get MediaFrameRegistry from ContextFactory
475                let media_frame_registry = self
476                    .context_factory
477                    .as_ref()
478                    .expect("ContextFactory must exist")
479                    .media_frame_registry
480                    .clone();
481
482                // Create WebRtcCoordinator
483                let coordinator =
484                    Arc::new(crate::wire::webrtc::coordinator::WebRtcCoordinator::new(
485                        actor_id.clone(),
486                        credential.clone(),
487                        self.signaling_client.clone(),
488                        WebRtcConfig::default(),
489                        media_frame_registry,
490                    ));
491
492                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
493                // 1.6. Create OutprocTransportManager + OutprocOutGate (新架构)
494                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
495                tracing::info!("🏗️  Creating OutprocTransportManager with WebRTC support");
496
497                // Create DefaultWireBuilder with WebRTC coordinator
498                use crate::transport::{DefaultWireBuilder, DefaultWireBuilderConfig};
499                let wire_builder_config = DefaultWireBuilderConfig {
500                    websocket_url_template: None, // WebSocket disabled for now
501                    enable_webrtc: true,
502                    enable_websocket: false,
503                };
504                let wire_builder = Arc::new(DefaultWireBuilder::new(
505                    Some(coordinator.clone()),
506                    wire_builder_config,
507                ));
508
509                // Create OutprocTransportManager
510                use crate::transport::OutprocTransportManager;
511                let transport_manager =
512                    Arc::new(OutprocTransportManager::new(actor_id.clone(), wire_builder));
513
514                // Create OutprocOutGate with WebRTC coordinator for MediaTrack support
515                use crate::outbound::{OutGate, OutprocOutGate};
516                let outproc_gate = Arc::new(OutprocOutGate::new(
517                    transport_manager,
518                    Some(coordinator.clone()), // Enable MediaTrack support
519                ));
520                let outproc_gate_enum = OutGate::OutprocOut(outproc_gate.clone());
521
522                tracing::info!("✅ OutprocTransportManager + OutprocOutGate initialized");
523
524                // Get DataStreamRegistry from ContextFactory
525                let data_stream_registry = self
526                    .context_factory
527                    .as_ref()
528                    .expect("ContextFactory must exist")
529                    .data_stream_registry
530                    .clone();
531
532                // Create WebRtcGate with shared pending_requests and DataStreamRegistry
533                let pending_requests = outproc_gate.get_pending_requests();
534                let gate = Arc::new(crate::wire::webrtc::gate::WebRtcGate::new(
535                    coordinator.clone(),
536                    pending_requests,
537                    data_stream_registry,
538                ));
539
540                // Set local_id
541                gate.set_local_id(actor_id.clone()).await;
542
543                tracing::info!(
544                    "✅ WebRtcGate created with shared pending_requests and DataStreamRegistry"
545                );
546
547                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
548                // 1.7. Set outproc_gate in ContextFactory (completing initialization)
549                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
550                tracing::info!("🔧 Setting outproc_gate in ContextFactory");
551                self.context_factory
552                    .as_mut()
553                    .expect("ContextFactory must exist")
554                    .set_outproc_gate(outproc_gate_enum);
555
556                tracing::info!(
557                    "✅ ContextFactory fully initialized (inproc + outproc gates ready)"
558                );
559
560                // Save references (WebRtcGate kept for backward compatibility if needed)
561                self.webrtc_coordinator = Some(coordinator.clone());
562                self.webrtc_gate = Some(gate.clone());
563
564                tracing::info!("✅ WebRTC infrastructure initialized");
565
566                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
567                // 1.8. Spawn dedicated Unregister task (best-effort, with timeout)
568                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
569                //
570                // This task:
571                // - Waits for shutdown_token to be cancelled (e.g., wait_for_ctrl_c_and_shutdown)
572                // - Then sends UnregisterRequest via signaling client with a timeout
573                //
574                // NOTE: we push its JoinHandle into task_handles so it can be aborted
575                // by ActrRefShared::Drop if needed.
576                {
577                    let shutdown = self.shutdown_token.clone();
578                    let client = self.signaling_client.clone();
579                    let actor_id_for_unreg = actor_id.clone();
580                    let credential_for_unreg = credential.clone();
581                    let webrtc_coordinator = self.webrtc_coordinator.clone();
582
583                    let unregister_handle = tokio::spawn(async move {
584                        // Wait for shutdown signal
585                        shutdown.cancelled().await;
586                        tracing::info!(
587                            "📡 Shutdown signal received2, sending UnregisterRequest for Actor {:?}",
588                            actor_id_for_unreg
589                        );
590
591                        // 1. 先关闭所有 WebRTC peer 连接(如果存在)
592                        if let Some(coord) = webrtc_coordinator {
593                            if let Err(e) = coord.close_all_peers().await {
594                                tracing::warn!(
595                                    "⚠️ Failed to close all WebRTC peers before UnregisterRequest: {}",
596                                    e
597                                );
598                            } else {
599                                tracing::info!(
600                                    "✅ All WebRTC peers closed before UnregisterRequest"
601                                );
602                            }
603                        } else {
604                            tracing::debug!(
605                                "WebRTC coordinator not found before UnregisterRequest (no WebRTC?)"
606                            );
607                        }
608
609                        // 2. 再发送 UnregisterRequest,设置一个超时(例如 5 秒)
610                        let result = tokio::time::timeout(
611                            std::time::Duration::from_secs(5),
612                            client.send_unregister_request(
613                                actor_id_for_unreg.clone(),
614                                credential_for_unreg.clone(),
615                                Some("Graceful shutdown".to_string()),
616                            ),
617                        )
618                        .await;
619                        tracing::info!("UnregisterRequest result: {:?}", result);
620                        match result {
621                            Ok(Ok(_)) => {
622                                tracing::info!(
623                                    "✅ UnregisterRequest sent to signaling server for Actor {:?}",
624                                    actor_id_for_unreg
625                                );
626                            }
627                            Ok(Err(e)) => {
628                                tracing::warn!(
629                                    "⚠️ Failed to send UnregisterRequest for Actor {:?}: {}",
630                                    actor_id_for_unreg,
631                                    e
632                                );
633                            }
634                            Err(_) => {
635                                tracing::warn!(
636                                    "⚠️ UnregisterRequest timeout (5s) for Actor {:?}",
637                                    actor_id_for_unreg
638                                );
639                            }
640                        }
641                    });
642
643                    task_handles.push(unregister_handle);
644                }
645
646                // Spawn signaling auto-reconnect helper that reacts immediately to disconnect events.
647                crate::wire::webrtc::spawn_signaling_reconnector(
648                    self.signaling_client.clone(),
649                    self.shutdown_token.clone(),
650                );
651            }
652            Some(register_response::Result::Error(error)) => {
653                tracing::error!(
654                    severity = 10,
655                    error_category = "registration_error",
656                    error_code = error.code,
657                    "❌ Registration failed: code={}, message={}",
658                    error.code,
659                    error.message
660                );
661                return Err(actr_protocol::ProtocolError::TransportError(format!(
662                    "Registration rejected: {} (code: {})",
663                    error.message, error.code
664                )));
665            }
666            None => {
667                tracing::error!(
668                    severity = 10,
669                    error_category = "registration_error",
670                    "❌ Registration response missing result"
671                );
672                return Err(actr_protocol::ProtocolError::TransportError(
673                    "Invalid registration response: missing result".to_string(),
674                ));
675            }
676        }
677
678        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
679        // 2. Transport layer initialization (completed via WebRTC infrastructure)
680        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
681        tracing::info!("✅ Transport layer initialized via WebRTC infrastructure");
682
683        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
684        // 3.1 Convert to Arc (before starting background loops)
685        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
686        // Clone actor_id before moving self into Arc
687        let actor_id = self
688            .actor_id
689            .as_ref()
690            .ok_or_else(|| {
691                actr_protocol::ProtocolError::InvalidStateTransition(
692                    "Actor ID not set - registration must complete before starting node"
693                        .to_string(),
694                )
695            })?
696            .clone();
697        let credential = self
698            .credential
699            .as_ref()
700            .ok_or_else(|| {
701                actr_protocol::ProtocolError::InvalidStateTransition(
702                    "Credential not set - node must be started before handling messages"
703                        .to_string(),
704                )
705            })?
706            .clone();
707
708        let actor_id_for_shell = actor_id.clone();
709        let shutdown_token = self.shutdown_token.clone();
710        let node_ref = Arc::new(self);
711
712        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
713        // 3.5. Start WebRTC background loops (BEFORE on_start)
714        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
715        // CRITICAL: Start signaling loop before on_start() to avoid deadlock
716        // where on_start() tries to send messages but signaling loop isn't running
717        tracing::info!("🚀 Starting WebRTC background loops");
718
719        // Start WebRtcCoordinator signaling loop
720        if let Some(coordinator) = &node_ref.webrtc_coordinator {
721            coordinator.clone().start().await.map_err(|e| {
722                actr_protocol::ProtocolError::TransportError(format!(
723                    "WebRtcCoordinator start failed: {e}"
724                ))
725            })?;
726            tracing::info!("✅ WebRtcCoordinator signaling loop started");
727        }
728
729        // Start WebRtcGate message receive loop (route to Mailbox)
730        if let Some(gate) = &node_ref.webrtc_gate {
731            gate.start_receive_loop(node_ref.mailbox.clone())
732                .await
733                .map_err(|e| {
734                    actr_protocol::ProtocolError::TransportError(format!(
735                        "WebRtcGate receive loop start failed: {e}"
736                    ))
737                })?;
738            tracing::info!("✅ WebRtcGate → Mailbox routing started");
739        }
740
741        tracing::info!("✅ WebRTC background loops started");
742
743        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
744        // 4. Call lifecycle hook on_start (AFTER WebRTC loops are running)
745        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
746        tracing::info!("🪝 Calling lifecycle hook: on_start");
747
748        let ctx = node_ref
749            .context_factory
750            .as_ref()
751            .expect("ContextFactory must be initialized before on_start")
752            .create(
753                &actor_id,
754                None,        // caller_id
755                "bootstrap", // request_id
756                &credential,
757            );
758        node_ref.workload.on_start(&ctx).await?;
759        tracing::info!("✅ Lifecycle hook on_start completed");
760
761        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
762        // 4.6. Start Inproc receive loop (Shell → Workload)
763        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
764        tracing::info!("🔄 Starting Inproc receive loop (Shell → Workload)");
765        // Start Workload receive loop (Shell → Workload REQUEST)
766        if let Some(shell_to_workload) = &node_ref.inproc_mgr {
767            if let Some(workload_to_shell) = &node_ref.workload_to_shell_mgr {
768                let node = node_ref.clone();
769                let request_rx_lane = shell_to_workload
770                    .get_lane(actr_protocol::PayloadType::RpcReliable, None)
771                    .await
772                    .map_err(|e| {
773                        actr_protocol::ProtocolError::TransportError(format!(
774                            "Failed to get Workload receive lane: {e}"
775                        ))
776                    })?;
777                let response_tx = workload_to_shell.clone();
778                let shutdown = shutdown_token.clone();
779
780                let inproc_handle = tokio::spawn(async move {
781                    loop {
782                        tokio::select! {
783                            _ = shutdown.cancelled() => {
784                                tracing::info!("📭 Workload receive loop (Shell → Workload) received shutdown signal");
785                                break;
786                            }
787
788                            envelope_result = request_rx_lane.recv_envelope() => {
789                                match envelope_result {
790                                    Ok(envelope) => {
791                                        let request_id = envelope.request_id.clone();
792                                        let span = tracing::info_span!("webrtc.receive_rpc", request_id = %request_id);
793                                        // Extract and set tracing context from envelope
794                                        #[cfg(feature = "opentelemetry")]
795                                        {
796                                            use crate::wire::webrtc::trace::set_parent_from_rpc_envelope;
797                                            set_parent_from_rpc_envelope(&span, &envelope);
798                                        }
799
800                                        tracing::debug!("📨 Workload received REQUEST from Shell: request_id={}", request_id);
801
802                                        // Shell calls have no caller_id (local process communication)
803                                        #[cfg(feature = "opentelemetry")]
804                                        let span_for_inject = span.clone();
805                                        match node.handle_incoming(envelope.clone(), None).instrument(span).await {
806                                            Ok(response_bytes) => {
807                                                // Send RESPONSE back via workload_to_shell
808                                                // Keep same route_key (no prefix needed - separate channels!)
809                                                #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
810                                                let mut response_envelope = RpcEnvelope {
811                                                    route_key: envelope.route_key.clone(),
812                                                    payload: Some(response_bytes),
813                                                    error: None,
814                                                    traceparent: None,
815                                                    tracestate: None,
816                                                    request_id: request_id.clone(),
817                                                    metadata: Vec::new(),
818                                                    timeout_ms: 30000,
819                                                };
820                                                // Inject tracing context
821                                                #[cfg(feature = "opentelemetry")]
822                                                {
823                                                    use crate::wire::webrtc::trace::inject_span_context_to_rpc;
824                                                    inject_span_context_to_rpc(&span_for_inject, &mut response_envelope);
825                                                }
826
827                                                // Send via Workload → Shell channel
828                                                if let Err(e) = response_tx.send_message(PayloadType::RpcReliable, None, response_envelope).await {
829                                                    tracing::error!(
830                                                        severity = 7,
831                                                        error_category = "transport_error",
832                                                        request_id = %request_id,
833                                                        "❌ Failed to send RESPONSE to Shell: {:?}", e
834                                                    );
835                                                }
836                                            }
837                                            Err(e) => {
838                                                tracing::error!(
839                                                    severity = 6,
840                                                    error_category = "handler_error",
841                                                    request_id = %request_id,
842                                                    route_key = %envelope.route_key,
843                                                    "❌ Workload message handling failed: {:?}", e
844                                                );
845
846                                                // Send error response (system-level error on envelope)
847                                                let error_response = actr_protocol::ErrorResponse {
848                                                    code: protocol_error_to_code(&e),
849                                                    message: e.to_string(),
850                                                };
851
852                                                #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
853                                                let mut error_envelope = RpcEnvelope {
854                                                    route_key: envelope.route_key.clone(),
855                                                    payload: None,
856                                                    error: Some(error_response),
857                                                    traceparent: envelope.traceparent.clone(),
858                                                    tracestate: envelope.tracestate.clone(),
859                                                    request_id: request_id.clone(),
860                                                    metadata: Vec::new(),
861                                                    timeout_ms: 30000,
862                                                };
863                                                // Inject tracing context
864                                                #[cfg(feature = "opentelemetry")]
865                                                {
866                                                    use crate::wire::webrtc::trace::inject_span_context_to_rpc;
867                                                    inject_span_context_to_rpc(&tracing::Span::current(), &mut error_envelope);
868                                                }
869
870                                                if let Err(e) = response_tx.send_message(PayloadType::RpcReliable, None, error_envelope).await {
871                                                    tracing::error!(
872                                                        severity = 7,
873                                                        error_category = "transport_error",
874                                                        request_id = %request_id,
875                                                        "❌ Failed to send ERROR response to Shell: {:?}", e
876                                                    );
877                                                }
878                                            }
879                                        }
880                                    }
881                                    Err(e) => {
882                                        tracing::error!(
883                                            severity = 8,
884                                            error_category = "transport_error",
885                                            "❌ Failed to receive from Shell → Workload lane: {:?}", e
886                                        );
887                                        break;
888                                    }
889                                }
890                            }
891                        }
892                    }
893                    tracing::info!(
894                        "✅ Workload receive loop (Shell → Workload) terminated gracefully"
895                    );
896                });
897
898                task_handles.push(inproc_handle);
899            }
900        }
901        tracing::info!("✅ Workload receive loop (Shell → Workload REQUEST) started");
902
903        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
904        // 4.7. Start Shell receive loop (Workload → Shell RESPONSE)
905        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
906        tracing::info!("🔄 Starting Shell receive loop (Workload → Shell RESPONSE)");
907        if let Some(workload_to_shell) = &node_ref.workload_to_shell_mgr {
908            if let Some(shell_to_workload) = &node_ref.inproc_mgr {
909                let response_rx_lane = workload_to_shell
910                    .get_lane(actr_protocol::PayloadType::RpcReliable, None)
911                    .await
912                    .map_err(|e| {
913                        actr_protocol::ProtocolError::TransportError(format!(
914                            "Failed to get Shell receive lane: {e}"
915                        ))
916                    })?;
917                let request_mgr = shell_to_workload.clone();
918                let shutdown = shutdown_token.clone();
919
920                let shell_receive_handle = tokio::spawn(async move {
921                    loop {
922                        tokio::select! {
923                            _ = shutdown.cancelled() => {
924                                tracing::info!("📭 Shell receive loop (Workload → Shell) received shutdown signal");
925                                break;
926                            }
927
928                            envelope_result = response_rx_lane.recv_envelope() => {
929                                match envelope_result {
930                                    Ok(envelope) => {
931                                        tracing::debug!("📨 Shell received RESPONSE from Workload: request_id={}", envelope.request_id);
932
933                                        // Check if response is success or error
934                                        match (envelope.payload, envelope.error) {
935                                            (Some(payload), None) => {
936                                                // Success response
937                                                if let Err(e) = request_mgr.complete_response(&envelope.request_id, payload).await {
938                                                    tracing::warn!(
939                                                        severity = 4,
940                                                        error_category = "orphan_response",
941                                                        request_id = %envelope.request_id,
942                                                        "⚠️  No pending request found for response: {:?}", e
943                                                    );
944                                                }
945                                            }
946                                            (None, Some(error)) => {
947                                                // Error response - convert to ProtocolError and complete with error
948                                                let protocol_err = actr_protocol::ProtocolError::TransportError(
949                                                    format!("RPC error {}: {}", error.code, error.message)
950                                                );
951                                                if let Err(e) = request_mgr.complete_error(&envelope.request_id, protocol_err).await {
952                                                    tracing::warn!(
953                                                        severity = 4,
954                                                        error_category = "orphan_response",
955                                                        request_id = %envelope.request_id,
956                                                        "⚠️  No pending request found for error response: {:?}", e
957                                                    );
958                                                }
959                                            }
960                                            _ => {
961                                                tracing::error!(
962                                                    severity = 7,
963                                                    error_category = "protocol_error",
964                                                    request_id = %envelope.request_id,
965                                                    "❌ Invalid RpcEnvelope: both payload and error are present or both absent"
966                                                );
967                                            }
968                                        }
969                                    }
970                                    Err(e) => {
971                                        tracing::error!(
972                                            severity = 8,
973                                            error_category = "transport_error",
974                                            "❌ Failed to receive from Workload → Shell lane: {:?}", e
975                                        );
976                                        break;
977                                    }
978                                }
979                            }
980                        }
981                    }
982                    tracing::info!(
983                        "✅ Shell receive loop (Workload → Shell) terminated gracefully"
984                    );
985                });
986
987                task_handles.push(shell_receive_handle);
988            }
989        }
990        tracing::info!("✅ Shell receive loop (Workload → Shell RESPONSE) started");
991
992        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
993        // 5. Start Mailbox processing loop (State Path)
994        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
995        tracing::info!("🔄 Starting Mailbox processing loop (State Path)");
996        {
997            let node = node_ref.clone();
998            let mailbox = node_ref.mailbox.clone();
999            let gate = node_ref.webrtc_gate.clone();
1000            let shutdown = shutdown_token.clone();
1001
1002            let mailbox_handle = tokio::spawn(async move {
1003                loop {
1004                    tokio::select! {
1005                        // Listen for shutdown signal
1006                        _ = shutdown.cancelled() => {
1007                            tracing::info!("📭 Mailbox loop received shutdown signal");
1008                            break;
1009                        }
1010
1011                        // Dequeue messages (by priority)
1012                        result = mailbox.dequeue() => {
1013                            match result {
1014                                Ok(messages) => {
1015                                    if messages.is_empty() {
1016                                        // Queue empty, sleep briefly
1017                                        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1018                                        continue;
1019                                    }
1020
1021                                    tracing::debug!("📬 Mailbox dequeue: {} messages", messages.len());
1022
1023                                    // Process messages one by one
1024                                    for msg_record in messages {
1025                                        // Deserialize RpcEnvelope (Protobuf)
1026                                        match RpcEnvelope::decode(&msg_record.payload[..]) {
1027                                            Ok(envelope) => {
1028                                                let request_id = envelope.request_id.clone();
1029                                                tracing::debug!("📦 Processing message: request_id={}", request_id);
1030
1031                                                // Decode caller_id from MessageRecord.from (transport layer)
1032                                                let caller_id_result = ActrId::decode(&msg_record.from[..]);
1033                                                let caller_id_ref = caller_id_result.as_ref().ok();
1034
1035                                                if caller_id_ref.is_none() {
1036                                                    tracing::warn!(
1037                                                        request_id = %request_id,
1038                                                        "⚠️  Failed to decode caller_id from MessageRecord.from"
1039                                                    );
1040                                                }
1041
1042                                                // Call handle_incoming with caller_id from transport layer
1043                                                match node.handle_incoming(envelope.clone(), caller_id_ref).await {
1044                                                    Ok(response_bytes) => {
1045                                                        // Send response (reuse request_id)
1046                                                        if let Some(ref gate) = gate {
1047                                                            // Use already decoded caller_id
1048                                                            match caller_id_result {
1049                                                                Ok(caller) => {
1050                                                                    // Construct response RpcEnvelope (reuse request_id!)
1051                                                                    #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1052                                                                    let mut response_envelope = RpcEnvelope {
1053                                                                        request_id,  // Reuse!
1054                                                                        route_key: envelope.route_key.clone(),
1055                                                                        payload: Some(response_bytes),
1056                                                                        error: None,
1057                                                                        traceparent: envelope.traceparent.clone(),
1058                                                                        tracestate: envelope.tracestate.clone(),
1059                                                                        metadata: Vec::new(),  // Response doesn't need extra metadata
1060                                                                        timeout_ms: 30000,
1061                                                                    };
1062                                                                    // Inject tracing context
1063                                                                    #[cfg(feature = "opentelemetry")]
1064                                                                    {
1065                                                                        use crate::wire::webrtc::trace::inject_span_context_to_rpc;
1066                                                                        inject_span_context_to_rpc(&tracing::Span::current(), &mut response_envelope);
1067                                                                    }
1068
1069                                                                    if let Err(e) = gate.send_response(&caller, response_envelope).await {
1070                                                                        tracing::error!(
1071                                                                            severity = 7,
1072                                                                            error_category = "transport_error",
1073                                                                            request_id = %envelope.request_id,
1074                                                                            "❌ Failed to send response: {:?}", e
1075                                                                        );
1076                                                                    }
1077                                                                }
1078                                                                Err(e) => {
1079                                                                    tracing::error!(
1080                                                                        severity = 8,
1081                                                                        error_category = "protobuf_decode",
1082                                                                        request_id = %envelope.request_id,
1083                                                                        "❌ Failed to decode caller_id: {:?}", e
1084                                                                    );
1085                                                                }
1086                                                            }
1087                                                        }
1088
1089                                                        // ACK message
1090                                                        if let Err(e) = mailbox.ack(msg_record.id).await {
1091                                                            tracing::error!(
1092                                                                severity = 9,
1093                                                                error_category = "mailbox_error",
1094                                                                request_id = %envelope.request_id,
1095                                                                message_id = %msg_record.id,
1096                                                                "❌ Mailbox ACK failed: {:?}", e
1097                                                            );
1098                                                        }
1099                                                    }
1100                                                    Err(e) => {
1101                                                        tracing::error!(
1102                                                            severity = 6,
1103                                                            error_category = "handler_error",
1104                                                            request_id = %envelope.request_id,
1105                                                            route_key = %envelope.route_key,
1106                                                            "❌ handle_incoming failed: {:?}", e
1107                                                        );
1108                                                        // ACK to avoid infinite retries
1109                                                        // Application errors are caller's responsibility
1110                                                        let _ = mailbox.ack(msg_record.id).await;
1111                                                    }
1112                                                }
1113                                            }
1114                                            Err(e) => {
1115                                                // Poison message - cannot decode RpcEnvelope
1116                                                tracing::error!(
1117                                                    severity = 9,
1118                                                    error_category = "protobuf_decode",
1119                                                    message_id = %msg_record.id,
1120                                                    "❌ Poison message: Failed to deserialize RpcEnvelope: {:?}", e
1121                                                );
1122
1123                                                // Write to Dead Letter Queue
1124                                                use actr_mailbox::DlqRecord;
1125                                                use chrono::Utc;
1126                                                use uuid::Uuid;
1127
1128                                                let dlq_record = DlqRecord {
1129                                                    id: Uuid::new_v4(),
1130                                                    original_message_id: Some(msg_record.id.to_string()),
1131                                                    from: Some(msg_record.from.clone()),
1132                                                    to: node.actor_id.as_ref().map(|id| {
1133                                                        let mut buf = Vec::new();
1134                                                        id.encode(&mut buf).unwrap();
1135                                                        buf
1136                                                    }),
1137                                                    raw_bytes: msg_record.payload.clone(),
1138                                                    error_message: format!("Protobuf decode failed: {e}"),
1139                                                    error_category: "protobuf_decode".to_string(),
1140                                                    trace_id: format!("mailbox-{}", msg_record.id),  // Fallback trace_id
1141                                                    request_id: None,
1142                                                    created_at: Utc::now(),
1143                                                    redrive_attempts: 0,
1144                                                    last_redrive_at: None,
1145                                                    context: Some(format!(
1146                                                        r#"{{"source":"mailbox","priority":"{}"}}"#,
1147                                                        match msg_record.priority {
1148                                                            actr_mailbox::MessagePriority::High => "high",
1149                                                            actr_mailbox::MessagePriority::Normal => "normal",
1150                                                        }
1151                                                    )),
1152                                                };
1153
1154                                                if let Err(dlq_err) = node.dlq.enqueue(dlq_record).await {
1155                                                    tracing::error!(
1156                                                        severity = 10,
1157                                                        "❌ CRITICAL: Failed to write poison message to DLQ: {:?}", dlq_err
1158                                                    );
1159                                                } else {
1160                                                    tracing::warn!(
1161                                                        severity = 9,
1162                                                        "☠️ Poison message moved to DLQ: message_id={}", msg_record.id
1163                                                    );
1164                                                }
1165
1166                                                // ACK the poison message to remove from mailbox
1167                                                let _ = mailbox.ack(msg_record.id).await;
1168                                            }
1169                                        }
1170                                    }
1171                                }
1172                                Err(e) => {
1173                                    tracing::error!(
1174                                        severity = 9,
1175                                        error_category = "mailbox_error",
1176                                        "❌ Mailbox dequeue failed: {:?}", e
1177                                    );
1178                                    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1179                                }
1180                            }
1181                        }
1182                    }
1183                }
1184                tracing::info!("✅ Mailbox processing loop terminated gracefully");
1185            });
1186
1187            task_handles.push(mailbox_handle);
1188        }
1189        tracing::info!("✅ Mailbox processing loop started");
1190
1191        tracing::info!("✅ ActrNode started successfully");
1192
1193        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1194        // 6. Create ActrRef for Shell to interact with Workload
1195        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1196        use crate::actr_ref::{ActrRef, ActrRefShared};
1197        use crate::outbound::InprocOutGate;
1198
1199        // Create InprocOutGate from shell_to_workload transport manager
1200        let shell_to_workload = node_ref
1201            .inproc_mgr
1202            .clone()
1203            .expect("inproc_mgr must be initialized");
1204        let inproc_gate = Arc::new(InprocOutGate::new(shell_to_workload));
1205
1206        // Create ActrRefShared
1207        let actr_ref_shared = Arc::new(ActrRefShared {
1208            actor_id: actor_id_for_shell.clone(),
1209            inproc_gate,
1210            shutdown_token: shutdown_token.clone(),
1211            task_handles: tokio::sync::Mutex::new(task_handles),
1212        });
1213
1214        // Create ActrRef
1215        let actr_ref = ActrRef::new(actr_ref_shared, node_ref);
1216
1217        tracing::info!("✅ ActrRef created (Shell → Workload communication handle)");
1218
1219        Ok(actr_ref)
1220    }
1221}