actr_runtime/lifecycle/
actr_node.rs

1//! ActrNode - ActrSystem + Workload (1:1 composition)
2
3use actr_framework::{Bytes, Workload};
4use actr_protocol::prost::Message as ProstMessage;
5use actr_protocol::{
6    ActorResult, ActrId, ActrType, PayloadType, RouteCandidatesRequest, RpcEnvelope,
7    route_candidates_request,
8};
9use futures_util::FutureExt;
10use std::sync::Arc;
11use tokio_util::sync::CancellationToken;
12
13use crate::context_factory::ContextFactory;
14use crate::transport::InprocTransportManager;
15
16// Use types from sub-crates
17use crate::wire::webrtc::{SignalingClient, WebRtcConfig};
18use actr_mailbox::{DeadLetterQueue, Mailbox};
19use actr_protocol::{AIdCredential, RegisterRequest, register_response};
20
21// Use extension traits from actr-protocol
22use actr_protocol::{ActrIdExt, ActrTypeExt};
23
24/// ActrNode - ActrSystem + Workload (1:1 composition)
25///
26/// # Generic Parameters
27/// - `W`: Workload type
28///
29/// # MessageDispatcher Association
30/// - Statically associated via W::Dispatcher
31/// - Does not store Dispatcher instance (not even ZST needed)
32/// - Dispatch calls entirely through type system
33pub struct ActrNode<W: Workload> {
34    /// Runtime configuration
35    pub(crate) config: actr_config::Config,
36
37    /// Workload instance (the only business logic)
38    pub(crate) workload: Arc<W>,
39
40    /// SQLite persistent mailbox
41    pub(crate) mailbox: Arc<dyn Mailbox>,
42
43    /// Dead Letter Queue for poison messages
44    pub(crate) dlq: Arc<dyn DeadLetterQueue>,
45
46    /// Context factory (created in start() after obtaining ActorId)
47    pub(crate) context_factory: Option<ContextFactory>,
48
49    /// Signaling client
50    pub(crate) signaling_client: Arc<dyn SignalingClient>,
51
52    /// Actor ID (obtained after startup)
53    pub(crate) actor_id: Option<ActrId>,
54
55    /// Actor Credential (obtained after startup, used for subsequent authentication messages)
56    pub(crate) credential: Option<AIdCredential>,
57
58    /// WebRTC coordinator (created after startup)
59    pub(crate) webrtc_coordinator: Option<Arc<crate::wire::webrtc::coordinator::WebRtcCoordinator>>,
60
61    /// WebRTC Gate (created after startup)
62    pub(crate) webrtc_gate: Option<Arc<crate::wire::webrtc::gate::WebRtcGate>>,
63
64    /// Shell → Workload Transport Manager
65    ///
66    /// Workload receives REQUEST from Shell (zero serialization, direct RpcEnvelope passing)
67    pub(crate) inproc_mgr: Option<Arc<InprocTransportManager>>,
68
69    /// Workload → Shell Transport Manager
70    ///
71    /// Workload sends RESPONSE to Shell (separate pending_requests from Shell's)
72    pub(crate) workload_to_shell_mgr: Option<Arc<InprocTransportManager>>,
73
74    /// Shutdown token for graceful shutdown
75    pub(crate) shutdown_token: CancellationToken,
76}
77
78/// Map ProtocolError to error code for ErrorResponse
79fn protocol_error_to_code(err: &actr_protocol::ProtocolError) -> u32 {
80    use actr_protocol::ProtocolError;
81    match err {
82        ProtocolError::Actr(_) => 400, // Bad Request - identity/decode error
83        ProtocolError::Uri(_) => 400,  // Bad Request - URI parsing error
84        ProtocolError::Name(_) => 400, // Bad Request - invalid name
85        ProtocolError::SerializationError(_) => 500, // Internal Server Error
86        ProtocolError::DeserializationError(_) => 400, // Bad Request - invalid payload
87        ProtocolError::DecodeError(_) => 400, // Bad Request - decode failure
88        ProtocolError::EncodeError(_) => 500, // Internal Server Error
89        ProtocolError::UnknownRoute(_) => 404, // Not Found - route not found
90        ProtocolError::TransportError(_) => 503, // Service Unavailable
91        ProtocolError::Timeout => 504, // Gateway Timeout
92        ProtocolError::TargetNotFound(_) => 404, // Not Found
93        ProtocolError::TargetUnavailable(_) => 503, // Service Unavailable
94        ProtocolError::InvalidStateTransition(_) => 500, // Internal Server Error
95    }
96}
97
98impl<W: Workload> ActrNode<W> {
99    /// Get Inproc Transport Manager
100    ///
101    /// # Returns
102    /// - `Some(Arc<InprocTransportManager>)`: Initialized manager
103    /// - `None`: Not yet started (need to call start() first)
104    ///
105    /// # Use Cases
106    /// - Workload internals need to communicate with Shell
107    /// - Create custom LatencyFirst/MediaTrack channels
108    pub fn inproc_mgr(&self) -> Option<Arc<InprocTransportManager>> {
109        self.inproc_mgr.clone()
110    }
111
112    /// Get ActorId (if registration has completed)
113    pub fn actor_id(&self) -> Option<&ActrId> {
114        self.actor_id.as_ref()
115    }
116
117    /// Get credential (if registration has completed)
118    pub fn credential(&self) -> Option<&AIdCredential> {
119        self.credential.as_ref()
120    }
121
122    /// Get signaling client (for manual control such as UnregisterRequest)
123    pub fn signaling_client(&self) -> Arc<dyn SignalingClient> {
124        self.signaling_client.clone()
125    }
126
127    /// Get shutdown token for this node
128    pub fn shutdown_token(&self) -> CancellationToken {
129        self.shutdown_token.clone()
130    }
131
132    /// Discover remote actors of the specified type via signaling server.
133    ///
134    /// This requests best route candidates via `RouteCandidatesRequest` using the node's existing registration.
135    ///
136    /// The method returns the ordered list of candidate `ActrId`s reported by the signaling server.
137    ///
138    /// # Errors
139    /// - Returns `InvalidStateTransition` if the node is not started (no actor_id/credential).
140    ///   The node must be started via `start()` before calling this method.
141    /// - Returns `TransportError` if the signaling client is not connected.
142    pub async fn discover_route_candidates(
143        &self,
144        target_type: &ActrType,
145        candidate_count: u32,
146    ) -> ActorResult<Vec<ActrId>> {
147        // Check if node is started (has actor_id and credential)
148        let actor_id = self.actor_id.as_ref().ok_or_else(|| {
149            actr_protocol::ProtocolError::InvalidStateTransition(
150                "Node is not started. Call start() first.".to_string(),
151            )
152        })?;
153
154        let credential = self.credential.as_ref().ok_or_else(|| {
155            actr_protocol::ProtocolError::InvalidStateTransition(
156                "Node is not started. Call start() first.".to_string(),
157            )
158        })?;
159
160        // Check if the signaling client is connected
161        if !self.signaling_client.is_connected() {
162            return Err(actr_protocol::ProtocolError::TransportError(
163                "Signaling client is not connected.".to_string(),
164            ));
165        }
166
167        let client = self.signaling_client.as_ref();
168
169        let criteria = route_candidates_request::NodeSelectionCriteria {
170            candidate_count,
171            ranking_factors: Vec::new(),
172            minimal_dependency_requirement: None,
173            minimal_health_requirement: None,
174        };
175
176        let route_request = RouteCandidatesRequest {
177            target_type: target_type.clone(),
178            criteria: Some(criteria),
179            client_location: None,
180        };
181
182        let route_response = client
183            .send_route_candidates_request(actor_id.clone(), credential.clone(), route_request)
184            .await
185            .map_err(|e| {
186                actr_protocol::ProtocolError::TransportError(format!(
187                    "Route candidates request failed: {e}"
188                ))
189            })?;
190
191        match route_response.result {
192            Some(actr_protocol::route_candidates_response::Result::Success(success)) => {
193                Ok(success.candidates)
194            }
195            Some(actr_protocol::route_candidates_response::Result::Error(err)) => {
196                Err(actr_protocol::ProtocolError::TransportError(format!(
197                    "Route candidates error {}: {}",
198                    err.code, err.message
199                )))
200            }
201            None => Err(actr_protocol::ProtocolError::TransportError(
202                "Invalid route candidates response: missing result".to_string(),
203            )),
204        }
205    }
206
207    /// Handle incoming message envelope
208    ///
209    /// # Performance Analysis
210    /// 1. create_context: ~10ns
211    /// 2. W::Dispatcher::dispatch: ~5-10ns (static match, can be inlined)
212    /// 3. User business logic: variable
213    ///
214    /// Framework overhead: ~15-20ns (compared to 50-100ns in traditional approaches)
215    ///
216    /// # Zero-cost Abstraction
217    /// - Compiler can inline entire call chain
218    /// - Match branches can be directly expanded
219    /// - Final generated code approaches hand-written match expression
220    ///
221    /// # Parameters
222    /// - `envelope`: The RPC envelope containing the message
223    /// - `caller_id`: The ActrId of the caller (from transport layer, None for local Shell calls)
224    ///
225    /// # caller_id Design
226    ///
227    /// **Why not in RpcEnvelope?**
228    /// - Transport layer (WebRTC/Mailbox) already knows the sender
229    /// - All connections are direct P2P (no intermediaries)
230    /// - Storing in envelope would be redundant duplication
231    ///
232    /// **How it works:**
233    /// - WebRTC/Mailbox stores sender in `MessageRecord.from` (Protobuf bytes)
234    /// - Only decoded when creating Context (once per message)
235    /// - Shell calls pass `None` (local process, no remote caller)
236    /// - Remote calls decode from `MessageRecord.from`
237    ///
238    /// **trace_id vs request_id:**
239    /// - `trace_id`: Distributed tracing across entire call chain (A → B → C)
240    /// - `request_id`: Unique identifier for each request-response pair
241    /// - Both kept for flexibility in complex scenarios
242    /// - Single-hop calls: effectively identical
243    /// - Multi-hop calls: trace_id spans all hops, request_id per hop
244    pub async fn handle_incoming(
245        &self,
246        envelope: RpcEnvelope,
247        caller_id: Option<&ActrId>,
248    ) -> ActorResult<Bytes> {
249        use actr_framework::MessageDispatcher;
250
251        // Log received message
252        if let Some(caller) = caller_id {
253            tracing::debug!(
254                "📨 Handling incoming message: route_key={}, caller={}, trace_id={}, request_id={}",
255                envelope.route_key,
256                caller.to_string_repr(),
257                envelope.trace_id,
258                envelope.request_id
259            );
260        } else {
261            tracing::debug!(
262                "📨 Handling incoming message: route_key={}, trace_id={}, request_id={}",
263                envelope.route_key,
264                envelope.trace_id,
265                envelope.request_id
266            );
267        }
268
269        // 1. Create Context with caller_id from transport layer
270        let actor_id = self.actor_id.as_ref().ok_or_else(|| {
271            actr_protocol::ProtocolError::InvalidStateTransition(
272                "Actor ID not set - node must be started before handling messages".to_string(),
273            )
274        })?;
275        let credential = self.credential.as_ref().ok_or_else(|| {
276            actr_protocol::ProtocolError::InvalidStateTransition(
277                "Credential not set - node must be started before handling messages".to_string(),
278            )
279        })?;
280
281        let ctx = self
282            .context_factory
283            .as_ref()
284            .expect("ContextFactory must be initialized in start()")
285            .create(
286                actor_id,
287                caller_id, // caller_id from transport layer (MessageRecord.from)
288                &envelope.trace_id,
289                &envelope.request_id,
290                credential,
291            );
292
293        // 2. Static MessageRouter dispatch (zero-cost abstraction)
294        // Compiler will inline entire call chain, generating code close to hand-written match
295        //
296        // Wrap dispatch in panic catching to prevent handler panics from crashing the runtime
297        let result = std::panic::AssertUnwindSafe(W::Dispatcher::dispatch(
298            &self.workload,
299            envelope.clone(),
300            &ctx,
301        ))
302        .catch_unwind()
303        .await;
304
305        let result = match result {
306            Ok(handler_result) => handler_result,
307            Err(panic_payload) => {
308                // Handler panicked - extract panic info
309                let panic_info = if let Some(s) = panic_payload.downcast_ref::<&str>() {
310                    s.to_string()
311                } else if let Some(s) = panic_payload.downcast_ref::<String>() {
312                    s.clone()
313                } else {
314                    "Unknown panic payload".to_string()
315                };
316
317                tracing::error!(
318                    severity = 8,
319                    error_category = "handler_panic",
320                    route_key = envelope.route_key,
321                    trace_id = envelope.trace_id,
322                    "❌ Handler panicked: {}",
323                    panic_info
324                );
325
326                // Return DecodeFailure error with panic info
327                // (using DecodeFailure as a proxy for "cannot process message")
328                Err(actr_protocol::ProtocolError::Actr(
329                    actr_protocol::ActrError::DecodeFailure {
330                        message: format!("Handler panicked: {panic_info}"),
331                    },
332                ))
333            }
334        };
335
336        // 3. Log result
337        match &result {
338            Ok(_) => tracing::debug!(
339                trace_id = %envelope.trace_id,
340                request_id = %envelope.request_id,
341                route_key = %envelope.route_key,
342                "✅ Message handled successfully"
343            ),
344            Err(e) => tracing::error!(
345                severity = 6,
346                error_category = "handler_error",
347                trace_id = %envelope.trace_id,
348                request_id = %envelope.request_id,
349                route_key = %envelope.route_key,
350                "❌ Message handling failed: {:?}", e
351            ),
352        }
353
354        result
355    }
356
357    /// Start the system
358    ///
359    /// # Startup Sequence
360    /// 1. Connect to signaling server and register Actor
361    /// 2. Initialize transport layer (WebRTC)
362    /// 3. Call lifecycle hook on_start (if Lifecycle trait is implemented)
363    /// 4. Start Mailbox processing loop (State Path serial processing)
364    /// 5. Start Transport (begin receiving messages)
365    /// 6. Create ActrRef for Shell to interact with Workload
366    ///
367    /// # Returns
368    /// - `ActrRef<W>`: Lightweight reference for Shell to call Workload methods
369    pub async fn start(mut self) -> ActorResult<crate::actr_ref::ActrRef<W>> {
370        tracing::info!("🚀 Starting ActrNode");
371
372        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
373        // 1. Connect to signaling server and register
374        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
375        tracing::info!("📡 Connecting to signaling server");
376        self.signaling_client.connect().await.map_err(|e| {
377            actr_protocol::ProtocolError::TransportError(format!("Signaling connect failed: {e}"))
378        })?;
379        tracing::info!("✅ Connected to signaling server");
380
381        // Get ActrType
382        let actr_type = self.workload.actor_type();
383        tracing::info!("📋 Actor type: {}", actr_type.to_string_repr());
384
385        // Calculate ServiceSpec from config exports
386        let service_spec = self.config.calculate_service_spec();
387        if let Some(ref spec) = service_spec {
388            tracing::info!("📦 Service fingerprint: {}", spec.fingerprint);
389            tracing::info!("📦 Service tags: {:?}", spec.tags);
390        } else {
391            tracing::info!("📦 No proto exports, ServiceSpec is None");
392        }
393
394        // Construct protobuf RegisterRequest
395        let register_request = RegisterRequest {
396            actr_type: actr_type.clone(),
397            realm: self.config.realm,
398            service_spec,
399            acl: self.config.acl.clone(),
400        };
401
402        tracing::info!("📤 Registering actor with signaling server (protobuf)");
403
404        // Use send_register_request to send and wait for response
405        let register_response = self
406            .signaling_client
407            .send_register_request(register_request)
408            .await
409            .map_err(|e| {
410                actr_protocol::ProtocolError::TransportError(format!(
411                    "Actor registration failed: {e}"
412                ))
413            })?;
414
415        // Handle RegisterResponse oneof result
416        //
417        // Collect background task handles (including unregister task) so they can be managed
418        // by ActrRefShared later.
419        let mut task_handles = Vec::new();
420
421        match register_response.result {
422            Some(register_response::Result::Success(register_ok)) => {
423                let actor_id = register_ok.actr_id;
424                let credential = register_ok.credential;
425
426                tracing::info!("✅ Registration successful");
427                tracing::info!(
428                    "🆔 Assigned ActrId: {}",
429                    actr_protocol::ActrIdExt::to_string_repr(&actor_id)
430                );
431                tracing::info!(
432                    "🔐 Received credential (token_key_id: {})",
433                    credential.token_key_id
434                );
435                tracing::info!(
436                    "💓 Signaling heartbeat interval: {} seconds",
437                    register_ok.signaling_heartbeat_interval_secs
438                );
439
440                // Log additional information (if available)
441                if register_ok.psk.is_some() {
442                    tracing::debug!("🔑 Received PSK (bootstrap keying material)");
443                }
444                if let Some(expires_at) = &register_ok.credential_expires_at {
445                    tracing::debug!("⏰ Credential expires at: {}s", expires_at.seconds);
446                }
447
448                // Store ActrId and Credential
449                self.actor_id = Some(actor_id.clone());
450                self.credential = Some(credential.clone());
451
452                // Persist identity into ContextFactory for later Context creation
453                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
454                // 1.3. Store references to both inproc managers (already created in ActrSystem::new())
455                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
456                let shell_to_workload = self
457                    .context_factory
458                    .as_ref()
459                    .expect("ContextFactory must exist")
460                    .shell_to_workload();
461                let workload_to_shell = self
462                    .context_factory
463                    .as_ref()
464                    .expect("ContextFactory must exist")
465                    .workload_to_shell();
466                self.inproc_mgr = Some(shell_to_workload); // Workload receives from this
467                self.workload_to_shell_mgr = Some(workload_to_shell); // Workload sends to this
468
469                tracing::info!(
470                    "✅ Inproc infrastructure already ready (created in ActrSystem::new())"
471                );
472
473                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
474                // 1.5. Create WebRTC infrastructure
475                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
476                tracing::info!("🌐 Initializing WebRTC infrastructure");
477
478                // Get MediaFrameRegistry from ContextFactory
479                let media_frame_registry = self
480                    .context_factory
481                    .as_ref()
482                    .expect("ContextFactory must exist")
483                    .media_frame_registry
484                    .clone();
485
486                // Create WebRtcCoordinator
487                let coordinator =
488                    Arc::new(crate::wire::webrtc::coordinator::WebRtcCoordinator::new(
489                        actor_id.clone(),
490                        credential.clone(),
491                        self.signaling_client.clone(),
492                        WebRtcConfig::default(),
493                        media_frame_registry,
494                    ));
495
496                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
497                // 1.6. Create OutprocTransportManager + OutprocOutGate (新架构)
498                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
499                tracing::info!("🏗️  Creating OutprocTransportManager with WebRTC support");
500
501                // Create DefaultWireBuilder with WebRTC coordinator
502                use crate::transport::{DefaultWireBuilder, DefaultWireBuilderConfig};
503                let wire_builder_config = DefaultWireBuilderConfig {
504                    websocket_url_template: None, // WebSocket disabled for now
505                    enable_webrtc: true,
506                    enable_websocket: false,
507                };
508                let wire_builder = Arc::new(DefaultWireBuilder::new(
509                    Some(coordinator.clone()),
510                    wire_builder_config,
511                ));
512
513                // Create OutprocTransportManager
514                use crate::transport::OutprocTransportManager;
515                let transport_manager =
516                    Arc::new(OutprocTransportManager::new(actor_id.clone(), wire_builder));
517
518                // Create OutprocOutGate with WebRTC coordinator for MediaTrack support
519                use crate::outbound::{OutGate, OutprocOutGate};
520                let outproc_gate = Arc::new(OutprocOutGate::new(
521                    transport_manager,
522                    Some(coordinator.clone()), // Enable MediaTrack support
523                ));
524                let outproc_gate_enum = OutGate::OutprocOut(outproc_gate.clone());
525
526                tracing::info!("✅ OutprocTransportManager + OutprocOutGate initialized");
527
528                // Get DataStreamRegistry from ContextFactory
529                let data_stream_registry = self
530                    .context_factory
531                    .as_ref()
532                    .expect("ContextFactory must exist")
533                    .data_stream_registry
534                    .clone();
535
536                // Create WebRtcGate with shared pending_requests and DataStreamRegistry
537                let pending_requests = outproc_gate.get_pending_requests();
538                let gate = Arc::new(crate::wire::webrtc::gate::WebRtcGate::new(
539                    coordinator.clone(),
540                    pending_requests,
541                    data_stream_registry,
542                ));
543
544                // Set local_id
545                gate.set_local_id(actor_id.clone()).await;
546
547                tracing::info!(
548                    "✅ WebRtcGate created with shared pending_requests and DataStreamRegistry"
549                );
550
551                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
552                // 1.7. Set outproc_gate in ContextFactory (completing initialization)
553                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
554                tracing::info!("🔧 Setting outproc_gate in ContextFactory");
555                self.context_factory
556                    .as_mut()
557                    .expect("ContextFactory must exist")
558                    .set_outproc_gate(outproc_gate_enum);
559
560                tracing::info!(
561                    "✅ ContextFactory fully initialized (inproc + outproc gates ready)"
562                );
563
564                // Save references (WebRtcGate kept for backward compatibility if needed)
565                self.webrtc_coordinator = Some(coordinator.clone());
566                self.webrtc_gate = Some(gate.clone());
567
568                tracing::info!("✅ WebRTC infrastructure initialized");
569
570                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
571                // 1.8. Spawn dedicated Unregister task (best-effort, with timeout)
572                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
573                //
574                // This task:
575                // - Waits for shutdown_token to be cancelled (e.g., wait_for_ctrl_c_and_shutdown)
576                // - Then sends UnregisterRequest via signaling client with a timeout
577                //
578                // NOTE: we push its JoinHandle into task_handles so it can be aborted
579                // by ActrRefShared::Drop if needed.
580                {
581                    let shutdown = self.shutdown_token.clone();
582                    let client = self.signaling_client.clone();
583                    let actor_id_for_unreg = actor_id.clone();
584                    let credential_for_unreg = credential.clone();
585                    let webrtc_coordinator = self.webrtc_coordinator.clone();
586
587                    let unregister_handle = tokio::spawn(async move {
588                        // Wait for shutdown signal
589                        shutdown.cancelled().await;
590                        tracing::info!(
591                            "📡 Shutdown signal received2, sending UnregisterRequest for Actor {:?}",
592                            actor_id_for_unreg
593                        );
594
595                        // 1. 先关闭所有 WebRTC peer 连接(如果存在)
596                        if let Some(coord) = webrtc_coordinator {
597                            if let Err(e) = coord.close_all_peers().await {
598                                tracing::warn!(
599                                    "⚠️ Failed to close all WebRTC peers before UnregisterRequest: {}",
600                                    e
601                                );
602                            } else {
603                                tracing::info!(
604                                    "✅ All WebRTC peers closed before UnregisterRequest"
605                                );
606                            }
607                        } else {
608                            tracing::debug!(
609                                "WebRTC coordinator not found before UnregisterRequest (no WebRTC?)"
610                            );
611                        }
612
613                        // 2. 再发送 UnregisterRequest,设置一个超时(例如 5 秒)
614                        let result = tokio::time::timeout(
615                            std::time::Duration::from_secs(5),
616                            client.send_unregister_request(
617                                actor_id_for_unreg.clone(),
618                                credential_for_unreg.clone(),
619                                Some("Graceful shutdown".to_string()),
620                            ),
621                        )
622                        .await;
623                        tracing::info!("UnregisterRequest result: {:?}", result);
624                        match result {
625                            Ok(Ok(_)) => {
626                                tracing::info!(
627                                    "✅ UnregisterRequest sent to signaling server for Actor {:?}",
628                                    actor_id_for_unreg
629                                );
630                            }
631                            Ok(Err(e)) => {
632                                tracing::warn!(
633                                    "⚠️ Failed to send UnregisterRequest for Actor {:?}: {}",
634                                    actor_id_for_unreg,
635                                    e
636                                );
637                            }
638                            Err(_) => {
639                                tracing::warn!(
640                                    "⚠️ UnregisterRequest timeout (5s) for Actor {:?}",
641                                    actor_id_for_unreg
642                                );
643                            }
644                        }
645                    });
646
647                    task_handles.push(unregister_handle);
648                }
649
650                // Spawn signaling auto-reconnect helper that reacts immediately to disconnect events.
651                crate::wire::webrtc::spawn_signaling_reconnector(
652                    self.signaling_client.clone(),
653                    self.shutdown_token.clone(),
654                );
655            }
656            Some(register_response::Result::Error(error)) => {
657                tracing::error!(
658                    severity = 10,
659                    error_category = "registration_error",
660                    error_code = error.code,
661                    "❌ Registration failed: code={}, message={}",
662                    error.code,
663                    error.message
664                );
665                return Err(actr_protocol::ProtocolError::TransportError(format!(
666                    "Registration rejected: {} (code: {})",
667                    error.message, error.code
668                )));
669            }
670            None => {
671                tracing::error!(
672                    severity = 10,
673                    error_category = "registration_error",
674                    "❌ Registration response missing result"
675                );
676                return Err(actr_protocol::ProtocolError::TransportError(
677                    "Invalid registration response: missing result".to_string(),
678                ));
679            }
680        }
681
682        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
683        // 2. Transport layer initialization (completed via WebRTC infrastructure)
684        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
685        tracing::info!("✅ Transport layer initialized via WebRTC infrastructure");
686
687        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
688        // 3.1 Convert to Arc (before starting background loops)
689        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
690        // Clone actor_id before moving self into Arc
691        let actor_id = self
692            .actor_id
693            .as_ref()
694            .ok_or_else(|| {
695                actr_protocol::ProtocolError::InvalidStateTransition(
696                    "Actor ID not set - registration must complete before starting node"
697                        .to_string(),
698                )
699            })?
700            .clone();
701        let credential = self
702            .credential
703            .as_ref()
704            .ok_or_else(|| {
705                actr_protocol::ProtocolError::InvalidStateTransition(
706                    "Credential not set - node must be started before handling messages"
707                        .to_string(),
708                )
709            })?
710            .clone();
711
712        let actor_id_for_shell = actor_id.clone();
713        let shutdown_token = self.shutdown_token.clone();
714        let node_ref = Arc::new(self);
715
716        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
717        // 3.5. Start WebRTC background loops (BEFORE on_start)
718        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
719        // CRITICAL: Start signaling loop before on_start() to avoid deadlock
720        // where on_start() tries to send messages but signaling loop isn't running
721        tracing::info!("🚀 Starting WebRTC background loops");
722
723        // Start WebRtcCoordinator signaling loop
724        if let Some(coordinator) = &node_ref.webrtc_coordinator {
725            coordinator.clone().start().await.map_err(|e| {
726                actr_protocol::ProtocolError::TransportError(format!(
727                    "WebRtcCoordinator start failed: {e}"
728                ))
729            })?;
730            tracing::info!("✅ WebRtcCoordinator signaling loop started");
731        }
732
733        // Start WebRtcGate message receive loop (route to Mailbox)
734        if let Some(gate) = &node_ref.webrtc_gate {
735            gate.start_receive_loop(node_ref.mailbox.clone())
736                .await
737                .map_err(|e| {
738                    actr_protocol::ProtocolError::TransportError(format!(
739                        "WebRtcGate receive loop start failed: {e}"
740                    ))
741                })?;
742            tracing::info!("✅ WebRtcGate → Mailbox routing started");
743        }
744
745        tracing::info!("✅ WebRTC background loops started");
746
747        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
748        // 4. Call lifecycle hook on_start (AFTER WebRTC loops are running)
749        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
750        tracing::info!("🪝 Calling lifecycle hook: on_start");
751
752        let ctx = node_ref
753            .context_factory
754            .as_ref()
755            .expect("ContextFactory must be initialized before on_start")
756            .create(
757                &actor_id,
758                None,        // caller_id
759                "bootstrap", // trace_id
760                "bootstrap", // request_id
761                &credential,
762            );
763        node_ref.workload.on_start(&ctx).await?;
764        tracing::info!("✅ Lifecycle hook on_start completed");
765
766        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
767        // 4.6. Start Inproc receive loop (Shell → Workload)
768        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
769        tracing::info!("🔄 Starting Inproc receive loop (Shell → Workload)");
770        // Start Workload receive loop (Shell → Workload REQUEST)
771        if let Some(shell_to_workload) = &node_ref.inproc_mgr {
772            if let Some(workload_to_shell) = &node_ref.workload_to_shell_mgr {
773                let node = node_ref.clone();
774                let request_rx_lane = shell_to_workload
775                    .get_lane(actr_protocol::PayloadType::RpcReliable, None)
776                    .await
777                    .map_err(|e| {
778                        actr_protocol::ProtocolError::TransportError(format!(
779                            "Failed to get Workload receive lane: {e}"
780                        ))
781                    })?;
782                let response_tx = workload_to_shell.clone();
783                let shutdown = shutdown_token.clone();
784
785                let inproc_handle = tokio::spawn(async move {
786                    loop {
787                        tokio::select! {
788                            _ = shutdown.cancelled() => {
789                                tracing::info!("📭 Workload receive loop (Shell → Workload) received shutdown signal");
790                                break;
791                            }
792
793                            envelope_result = request_rx_lane.recv_envelope() => {
794                                match envelope_result {
795                                    Ok(envelope) => {
796                                        let request_id = envelope.request_id.clone();
797                                        tracing::debug!("📨 Workload received REQUEST from Shell: request_id={}", request_id);
798
799                                        // Shell calls have no caller_id (local process communication)
800                                        match node.handle_incoming(envelope.clone(), None).await {
801                                            Ok(response_bytes) => {
802                                                // Send RESPONSE back via workload_to_shell
803                                                // Keep same route_key (no prefix needed - separate channels!)
804                                                let response_envelope = RpcEnvelope {
805                                                    route_key: envelope.route_key.clone(),
806                                                    payload: Some(response_bytes),
807                                                    error: None,
808                                                    trace_id: envelope.trace_id.clone(),
809                                                    request_id: request_id.clone(),
810                                                    metadata: Vec::new(),
811                                                    timeout_ms: 30000,
812                                                };
813
814                                                // Send via Workload → Shell channel
815                                                if let Err(e) = response_tx.send_message(PayloadType::RpcReliable, None, response_envelope).await {
816                                                    tracing::error!(
817                                                        severity = 7,
818                                                        error_category = "transport_error",
819                                                        trace_id = %envelope.trace_id,
820                                                        request_id = %request_id,
821                                                        "❌ Failed to send RESPONSE to Shell: {:?}", e
822                                                    );
823                                                }
824                                            }
825                                            Err(e) => {
826                                                tracing::error!(
827                                                    severity = 6,
828                                                    error_category = "handler_error",
829                                                    trace_id = %envelope.trace_id,
830                                                    request_id = %request_id,
831                                                    route_key = %envelope.route_key,
832                                                    "❌ Workload message handling failed: {:?}", e
833                                                );
834
835                                                // Send error response (system-level error on envelope)
836                                                let error_response = actr_protocol::ErrorResponse {
837                                                    code: protocol_error_to_code(&e),
838                                                    message: e.to_string(),
839                                                };
840
841                                                let error_envelope = RpcEnvelope {
842                                                    route_key: envelope.route_key.clone(),
843                                                    payload: None,
844                                                    error: Some(error_response),
845                                                    trace_id: envelope.trace_id.clone(),
846                                                    request_id: request_id.clone(),
847                                                    metadata: Vec::new(),
848                                                    timeout_ms: 30000,
849                                                };
850
851                                                if let Err(e) = response_tx.send_message(PayloadType::RpcReliable, None, error_envelope).await {
852                                                    tracing::error!(
853                                                        severity = 7,
854                                                        error_category = "transport_error",
855                                                        trace_id = %envelope.trace_id,
856                                                        request_id = %request_id,
857                                                        "❌ Failed to send ERROR response to Shell: {:?}", e
858                                                    );
859                                                }
860                                            }
861                                        }
862                                    }
863                                    Err(e) => {
864                                        tracing::error!(
865                                            severity = 8,
866                                            error_category = "transport_error",
867                                            "❌ Failed to receive from Shell → Workload lane: {:?}", e
868                                        );
869                                        break;
870                                    }
871                                }
872                            }
873                        }
874                    }
875                    tracing::info!(
876                        "✅ Workload receive loop (Shell → Workload) terminated gracefully"
877                    );
878                });
879
880                task_handles.push(inproc_handle);
881            }
882        }
883        tracing::info!("✅ Workload receive loop (Shell → Workload REQUEST) started");
884
885        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
886        // 4.7. Start Shell receive loop (Workload → Shell RESPONSE)
887        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
888        tracing::info!("🔄 Starting Shell receive loop (Workload → Shell RESPONSE)");
889        if let Some(workload_to_shell) = &node_ref.workload_to_shell_mgr {
890            if let Some(shell_to_workload) = &node_ref.inproc_mgr {
891                let response_rx_lane = workload_to_shell
892                    .get_lane(actr_protocol::PayloadType::RpcReliable, None)
893                    .await
894                    .map_err(|e| {
895                        actr_protocol::ProtocolError::TransportError(format!(
896                            "Failed to get Shell receive lane: {e}"
897                        ))
898                    })?;
899                let request_mgr = shell_to_workload.clone();
900                let shutdown = shutdown_token.clone();
901
902                let shell_receive_handle = tokio::spawn(async move {
903                    loop {
904                        tokio::select! {
905                            _ = shutdown.cancelled() => {
906                                tracing::info!("📭 Shell receive loop (Workload → Shell) received shutdown signal");
907                                break;
908                            }
909
910                            envelope_result = response_rx_lane.recv_envelope() => {
911                                match envelope_result {
912                                    Ok(envelope) => {
913                                        tracing::debug!("📨 Shell received RESPONSE from Workload: request_id={}", envelope.request_id);
914
915                                        // Check if response is success or error
916                                        match (envelope.payload, envelope.error) {
917                                            (Some(payload), None) => {
918                                                // Success response
919                                                if let Err(e) = request_mgr.complete_response(&envelope.request_id, payload).await {
920                                                    tracing::warn!(
921                                                        severity = 4,
922                                                        error_category = "orphan_response",
923                                                        trace_id = %envelope.trace_id,
924                                                        request_id = %envelope.request_id,
925                                                        "⚠️  No pending request found for response: {:?}", e
926                                                    );
927                                                }
928                                            }
929                                            (None, Some(error)) => {
930                                                // Error response - convert to ProtocolError and complete with error
931                                                let protocol_err = actr_protocol::ProtocolError::TransportError(
932                                                    format!("RPC error {}: {}", error.code, error.message)
933                                                );
934                                                if let Err(e) = request_mgr.complete_error(&envelope.request_id, protocol_err).await {
935                                                    tracing::warn!(
936                                                        severity = 4,
937                                                        error_category = "orphan_response",
938                                                        trace_id = %envelope.trace_id,
939                                                        request_id = %envelope.request_id,
940                                                        "⚠️  No pending request found for error response: {:?}", e
941                                                    );
942                                                }
943                                            }
944                                            _ => {
945                                                tracing::error!(
946                                                    severity = 7,
947                                                    error_category = "protocol_error",
948                                                    trace_id = %envelope.trace_id,
949                                                    request_id = %envelope.request_id,
950                                                    "❌ Invalid RpcEnvelope: both payload and error are present or both absent"
951                                                );
952                                            }
953                                        }
954                                    }
955                                    Err(e) => {
956                                        tracing::error!(
957                                            severity = 8,
958                                            error_category = "transport_error",
959                                            "❌ Failed to receive from Workload → Shell lane: {:?}", e
960                                        );
961                                        break;
962                                    }
963                                }
964                            }
965                        }
966                    }
967                    tracing::info!(
968                        "✅ Shell receive loop (Workload → Shell) terminated gracefully"
969                    );
970                });
971
972                task_handles.push(shell_receive_handle);
973            }
974        }
975        tracing::info!("✅ Shell receive loop (Workload → Shell RESPONSE) started");
976
977        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
978        // 5. Start Mailbox processing loop (State Path)
979        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
980        tracing::info!("🔄 Starting Mailbox processing loop (State Path)");
981        {
982            let node = node_ref.clone();
983            let mailbox = node_ref.mailbox.clone();
984            let gate = node_ref.webrtc_gate.clone();
985            let shutdown = shutdown_token.clone();
986
987            let mailbox_handle = tokio::spawn(async move {
988                loop {
989                    tokio::select! {
990                        // Listen for shutdown signal
991                        _ = shutdown.cancelled() => {
992                            tracing::info!("📭 Mailbox loop received shutdown signal");
993                            break;
994                        }
995
996                        // Dequeue messages (by priority)
997                        result = mailbox.dequeue() => {
998                            match result {
999                                Ok(messages) => {
1000                                    if messages.is_empty() {
1001                                        // Queue empty, sleep briefly
1002                                        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1003                                        continue;
1004                                    }
1005
1006                                    tracing::debug!("📬 Mailbox dequeue: {} messages", messages.len());
1007
1008                                    // Process messages one by one
1009                                    for msg_record in messages {
1010                                        // Deserialize RpcEnvelope (Protobuf)
1011                                        match RpcEnvelope::decode(&msg_record.payload[..]) {
1012                                            Ok(envelope) => {
1013                                                let request_id = envelope.request_id.clone();
1014                                                tracing::debug!("📦 Processing message: request_id={}", request_id);
1015
1016                                                // Decode caller_id from MessageRecord.from (transport layer)
1017                                                let caller_id_result = ActrId::decode(&msg_record.from[..]);
1018                                                let caller_id_ref = caller_id_result.as_ref().ok();
1019
1020                                                if caller_id_ref.is_none() {
1021                                                    tracing::warn!(
1022                                                        trace_id = %envelope.trace_id,
1023                                                        request_id = %request_id,
1024                                                        "⚠️  Failed to decode caller_id from MessageRecord.from"
1025                                                    );
1026                                                }
1027
1028                                                // Call handle_incoming with caller_id from transport layer
1029                                                match node.handle_incoming(envelope.clone(), caller_id_ref).await {
1030                                                    Ok(response_bytes) => {
1031                                                        // Send response (reuse request_id)
1032                                                        if let Some(ref gate) = gate {
1033                                                            // Use already decoded caller_id
1034                                                            match caller_id_result {
1035                                                                Ok(caller) => {
1036                                                                    // Construct response RpcEnvelope (reuse request_id!)
1037                                                                    let response_envelope = RpcEnvelope {
1038                                                                        request_id,  // Reuse!
1039                                                                        route_key: envelope.route_key.clone(),
1040                                                                        payload: Some(response_bytes),
1041                                                                        error: None,
1042                                                                        trace_id: envelope.trace_id.clone(),
1043                                                                        metadata: Vec::new(),  // Response doesn't need extra metadata
1044                                                                        timeout_ms: 30000,
1045                                                                    };
1046
1047                                                                    if let Err(e) = gate.send_response(&caller, response_envelope).await {
1048                                                                        tracing::error!(
1049                                                                            severity = 7,
1050                                                                            error_category = "transport_error",
1051                                                                            trace_id = %envelope.trace_id,
1052                                                                            request_id = %envelope.request_id,
1053                                                                            "❌ Failed to send response: {:?}", e
1054                                                                        );
1055                                                                    }
1056                                                                }
1057                                                                Err(e) => {
1058                                                                    tracing::error!(
1059                                                                        severity = 8,
1060                                                                        error_category = "protobuf_decode",
1061                                                                        trace_id = %envelope.trace_id,
1062                                                                        request_id = %envelope.request_id,
1063                                                                        "❌ Failed to decode caller_id: {:?}", e
1064                                                                    );
1065                                                                }
1066                                                            }
1067                                                        }
1068
1069                                                        // ACK message
1070                                                        if let Err(e) = mailbox.ack(msg_record.id).await {
1071                                                            tracing::error!(
1072                                                                severity = 9,
1073                                                                error_category = "mailbox_error",
1074                                                                trace_id = %envelope.trace_id,
1075                                                                request_id = %envelope.request_id,
1076                                                                message_id = %msg_record.id,
1077                                                                "❌ Mailbox ACK failed: {:?}", e
1078                                                            );
1079                                                        }
1080                                                    }
1081                                                    Err(e) => {
1082                                                        tracing::error!(
1083                                                            severity = 6,
1084                                                            error_category = "handler_error",
1085                                                            trace_id = %envelope.trace_id,
1086                                                            request_id = %envelope.request_id,
1087                                                            route_key = %envelope.route_key,
1088                                                            "❌ handle_incoming failed: {:?}", e
1089                                                        );
1090                                                        // ACK to avoid infinite retries
1091                                                        // Application errors are caller's responsibility
1092                                                        let _ = mailbox.ack(msg_record.id).await;
1093                                                    }
1094                                                }
1095                                            }
1096                                            Err(e) => {
1097                                                // Poison message - cannot decode RpcEnvelope
1098                                                tracing::error!(
1099                                                    severity = 9,
1100                                                    error_category = "protobuf_decode",
1101                                                    message_id = %msg_record.id,
1102                                                    "❌ Poison message: Failed to deserialize RpcEnvelope: {:?}", e
1103                                                );
1104
1105                                                // Write to Dead Letter Queue
1106                                                use actr_mailbox::DlqRecord;
1107                                                use chrono::Utc;
1108                                                use uuid::Uuid;
1109
1110                                                let dlq_record = DlqRecord {
1111                                                    id: Uuid::new_v4(),
1112                                                    original_message_id: Some(msg_record.id.to_string()),
1113                                                    from: Some(msg_record.from.clone()),
1114                                                    to: node.actor_id.as_ref().map(|id| {
1115                                                        let mut buf = Vec::new();
1116                                                        id.encode(&mut buf).unwrap();
1117                                                        buf
1118                                                    }),
1119                                                    raw_bytes: msg_record.payload.clone(),
1120                                                    error_message: format!("Protobuf decode failed: {e}"),
1121                                                    error_category: "protobuf_decode".to_string(),
1122                                                    trace_id: format!("mailbox-{}", msg_record.id),  // Fallback trace_id
1123                                                    request_id: None,
1124                                                    created_at: Utc::now(),
1125                                                    redrive_attempts: 0,
1126                                                    last_redrive_at: None,
1127                                                    context: Some(format!(
1128                                                        r#"{{"source":"mailbox","priority":"{}"}}"#,
1129                                                        match msg_record.priority {
1130                                                            actr_mailbox::MessagePriority::High => "high",
1131                                                            actr_mailbox::MessagePriority::Normal => "normal",
1132                                                        }
1133                                                    )),
1134                                                };
1135
1136                                                if let Err(dlq_err) = node.dlq.enqueue(dlq_record).await {
1137                                                    tracing::error!(
1138                                                        severity = 10,
1139                                                        "❌ CRITICAL: Failed to write poison message to DLQ: {:?}", dlq_err
1140                                                    );
1141                                                } else {
1142                                                    tracing::warn!(
1143                                                        severity = 9,
1144                                                        "☠️ Poison message moved to DLQ: message_id={}", msg_record.id
1145                                                    );
1146                                                }
1147
1148                                                // ACK the poison message to remove from mailbox
1149                                                let _ = mailbox.ack(msg_record.id).await;
1150                                            }
1151                                        }
1152                                    }
1153                                }
1154                                Err(e) => {
1155                                    tracing::error!(
1156                                        severity = 9,
1157                                        error_category = "mailbox_error",
1158                                        "❌ Mailbox dequeue failed: {:?}", e
1159                                    );
1160                                    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1161                                }
1162                            }
1163                        }
1164                    }
1165                }
1166                tracing::info!("✅ Mailbox processing loop terminated gracefully");
1167            });
1168
1169            task_handles.push(mailbox_handle);
1170        }
1171        tracing::info!("✅ Mailbox processing loop started");
1172
1173        tracing::info!("✅ ActrNode started successfully");
1174
1175        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1176        // 6. Create ActrRef for Shell to interact with Workload
1177        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1178        use crate::actr_ref::{ActrRef, ActrRefShared};
1179        use crate::outbound::InprocOutGate;
1180
1181        // Create InprocOutGate from shell_to_workload transport manager
1182        let shell_to_workload = node_ref
1183            .inproc_mgr
1184            .clone()
1185            .expect("inproc_mgr must be initialized");
1186        let inproc_gate = Arc::new(InprocOutGate::new(shell_to_workload));
1187
1188        // Create ActrRefShared
1189        let actr_ref_shared = Arc::new(ActrRefShared {
1190            actor_id: actor_id_for_shell.clone(),
1191            inproc_gate,
1192            shutdown_token: shutdown_token.clone(),
1193            task_handles: tokio::sync::Mutex::new(task_handles),
1194        });
1195
1196        // Create ActrRef
1197        let actr_ref = ActrRef::new(actr_ref_shared, node_ref);
1198
1199        tracing::info!("✅ ActrRef created (Shell → Workload communication handle)");
1200
1201        Ok(actr_ref)
1202    }
1203}