actr_runtime/lifecycle/
actr_node.rs

1//! ActrNode - ActrSystem + Workload (1:1 composition)
2
3use actr_framework::{Bytes, Workload};
4use actr_protocol::prost::Message as ProstMessage;
5use actr_protocol::{ActorResult, ActrId, PayloadType, RpcEnvelope};
6use futures_util::FutureExt;
7use std::sync::Arc;
8use tokio_util::sync::CancellationToken;
9
10use crate::context_factory::ContextFactory;
11use crate::transport::InprocTransportManager;
12
13// Use types from sub-crates
14use crate::wire::webrtc::{SignalingClient, WebRtcConfig};
15use actr_mailbox::{DeadLetterQueue, Mailbox};
16use actr_protocol::{AIdCredential, RegisterRequest, register_response};
17
18// Use extension traits from actr-protocol
19use actr_protocol::{ActrIdExt, ActrTypeExt};
20
21/// ActrNode - ActrSystem + Workload (1:1 composition)
22///
23/// # Generic Parameters
24/// - `W`: Workload type
25///
26/// # MessageDispatcher Association
27/// - Statically associated via W::Dispatcher
28/// - Does not store Dispatcher instance (not even ZST needed)
29/// - Dispatch calls entirely through type system
30pub struct ActrNode<W: Workload> {
31    /// Runtime configuration
32    pub(crate) config: actr_config::Config,
33
34    /// Workload instance (the only business logic)
35    pub(crate) workload: Arc<W>,
36
37    /// SQLite persistent mailbox
38    pub(crate) mailbox: Arc<dyn Mailbox>,
39
40    /// Dead Letter Queue for poison messages
41    pub(crate) dlq: Arc<dyn DeadLetterQueue>,
42
43    /// Context factory (created in start() after obtaining ActorId)
44    pub(crate) context_factory: Option<ContextFactory>,
45
46    /// Signaling client
47    pub(crate) signaling_client: Arc<dyn SignalingClient>,
48
49    /// Actor ID (obtained after startup)
50    pub(crate) actor_id: Option<ActrId>,
51
52    /// Actor Credential (obtained after startup, used for subsequent authentication messages)
53    pub(crate) credential: Option<AIdCredential>,
54
55    /// WebRTC coordinator (created after startup)
56    pub(crate) webrtc_coordinator: Option<Arc<crate::wire::webrtc::coordinator::WebRtcCoordinator>>,
57
58    /// WebRTC Gate (created after startup)
59    pub(crate) webrtc_gate: Option<Arc<crate::wire::webrtc::gate::WebRtcGate>>,
60
61    /// Shell β†’ Workload Transport Manager
62    ///
63    /// Workload receives REQUEST from Shell (zero serialization, direct RpcEnvelope passing)
64    pub(crate) inproc_mgr: Option<Arc<InprocTransportManager>>,
65
66    /// Workload β†’ Shell Transport Manager
67    ///
68    /// Workload sends RESPONSE to Shell (separate pending_requests from Shell's)
69    pub(crate) workload_to_shell_mgr: Option<Arc<InprocTransportManager>>,
70
71    /// Shutdown token for graceful shutdown
72    pub(crate) shutdown_token: CancellationToken,
73}
74
75/// Map ProtocolError to error code for ErrorResponse
76fn protocol_error_to_code(err: &actr_protocol::ProtocolError) -> u32 {
77    use actr_protocol::ProtocolError;
78    match err {
79        ProtocolError::Actr(_) => 400, // Bad Request - identity/decode error
80        ProtocolError::Uri(_) => 400,  // Bad Request - URI parsing error
81        ProtocolError::Name(_) => 400, // Bad Request - invalid name
82        ProtocolError::SerializationError(_) => 500, // Internal Server Error
83        ProtocolError::DeserializationError(_) => 400, // Bad Request - invalid payload
84        ProtocolError::DecodeError(_) => 400, // Bad Request - decode failure
85        ProtocolError::EncodeError(_) => 500, // Internal Server Error
86        ProtocolError::UnknownRoute(_) => 404, // Not Found - route not found
87        ProtocolError::TransportError(_) => 503, // Service Unavailable
88        ProtocolError::Timeout => 504, // Gateway Timeout
89        ProtocolError::TargetNotFound(_) => 404, // Not Found
90        ProtocolError::TargetUnavailable(_) => 503, // Service Unavailable
91        ProtocolError::InvalidStateTransition(_) => 500, // Internal Server Error
92    }
93}
94
95impl<W: Workload> ActrNode<W> {
96    /// Get Inproc Transport Manager
97    ///
98    /// # Returns
99    /// - `Some(Arc<InprocTransportManager>)`: Initialized manager
100    /// - `None`: Not yet started (need to call start() first)
101    ///
102    /// # Use Cases
103    /// - Workload internals need to communicate with Shell
104    /// - Create custom LatencyFirst/MediaTrack channels
105    pub fn inproc_mgr(&self) -> Option<Arc<InprocTransportManager>> {
106        self.inproc_mgr.clone()
107    }
108
109    /// Handle incoming message envelope
110    ///
111    /// # Performance Analysis
112    /// 1. create_context: ~10ns
113    /// 2. W::Dispatcher::dispatch: ~5-10ns (static match, can be inlined)
114    /// 3. User business logic: variable
115    ///
116    /// Framework overhead: ~15-20ns (compared to 50-100ns in traditional approaches)
117    ///
118    /// # Zero-cost Abstraction
119    /// - Compiler can inline entire call chain
120    /// - Match branches can be directly expanded
121    /// - Final generated code approaches hand-written match expression
122    ///
123    /// # Parameters
124    /// - `envelope`: The RPC envelope containing the message
125    /// - `caller_id`: The ActrId of the caller (from transport layer, None for local Shell calls)
126    ///
127    /// # caller_id Design
128    ///
129    /// **Why not in RpcEnvelope?**
130    /// - Transport layer (WebRTC/Mailbox) already knows the sender
131    /// - All connections are direct P2P (no intermediaries)
132    /// - Storing in envelope would be redundant duplication
133    ///
134    /// **How it works:**
135    /// - WebRTC/Mailbox stores sender in `MessageRecord.from` (Protobuf bytes)
136    /// - Only decoded when creating Context (once per message)
137    /// - Shell calls pass `None` (local process, no remote caller)
138    /// - Remote calls decode from `MessageRecord.from`
139    ///
140    /// **trace_id vs request_id:**
141    /// - `trace_id`: Distributed tracing across entire call chain (A β†’ B β†’ C)
142    /// - `request_id`: Unique identifier for each request-response pair
143    /// - Both kept for flexibility in complex scenarios
144    /// - Single-hop calls: effectively identical
145    /// - Multi-hop calls: trace_id spans all hops, request_id per hop
146    pub async fn handle_incoming(
147        &self,
148        envelope: RpcEnvelope,
149        caller_id: Option<&ActrId>,
150    ) -> ActorResult<Bytes> {
151        use actr_framework::MessageDispatcher;
152
153        // Log received message
154        if let Some(caller) = caller_id {
155            tracing::debug!(
156                "πŸ“¨ Handling incoming message: route_key={}, caller={}, trace_id={}, request_id={}",
157                envelope.route_key,
158                caller.to_string_repr(),
159                envelope.trace_id,
160                envelope.request_id
161            );
162        } else {
163            tracing::debug!(
164                "πŸ“¨ Handling incoming message: route_key={}, trace_id={}, request_id={}",
165                envelope.route_key,
166                envelope.trace_id,
167                envelope.request_id
168            );
169        }
170
171        // 1. Create Context with caller_id from transport layer
172        let actor_id = self.actor_id.as_ref().ok_or_else(|| {
173            actr_protocol::ProtocolError::InvalidStateTransition(
174                "Actor ID not set - node must be started before handling messages".to_string(),
175            )
176        })?;
177
178        let ctx = self
179            .context_factory
180            .as_ref()
181            .expect("ContextFactory must be initialized in start()")
182            .create(
183                actor_id,
184                caller_id, // caller_id from transport layer (MessageRecord.from)
185                &envelope.trace_id,
186                &envelope.request_id,
187            );
188
189        // 2. Static MessageRouter dispatch (zero-cost abstraction)
190        // Compiler will inline entire call chain, generating code close to hand-written match
191        //
192        // Wrap dispatch in panic catching to prevent handler panics from crashing the runtime
193        let result = std::panic::AssertUnwindSafe(W::Dispatcher::dispatch(
194            &self.workload,
195            envelope.clone(),
196            &ctx,
197        ))
198        .catch_unwind()
199        .await;
200
201        let result = match result {
202            Ok(handler_result) => handler_result,
203            Err(panic_payload) => {
204                // Handler panicked - extract panic info
205                let panic_info = if let Some(s) = panic_payload.downcast_ref::<&str>() {
206                    s.to_string()
207                } else if let Some(s) = panic_payload.downcast_ref::<String>() {
208                    s.clone()
209                } else {
210                    "Unknown panic payload".to_string()
211                };
212
213                tracing::error!(
214                    severity = 8,
215                    error_category = "handler_panic",
216                    route_key = envelope.route_key,
217                    trace_id = envelope.trace_id,
218                    "❌ Handler panicked: {}",
219                    panic_info
220                );
221
222                // Return DecodeFailure error with panic info
223                // (using DecodeFailure as a proxy for "cannot process message")
224                Err(actr_protocol::ProtocolError::Actr(
225                    actr_protocol::ActrError::DecodeFailure {
226                        message: format!("Handler panicked: {panic_info}"),
227                    },
228                ))
229            }
230        };
231
232        // 3. Log result
233        match &result {
234            Ok(_) => tracing::debug!(
235                trace_id = %envelope.trace_id,
236                request_id = %envelope.request_id,
237                route_key = %envelope.route_key,
238                "βœ… Message handled successfully"
239            ),
240            Err(e) => tracing::error!(
241                severity = 6,
242                error_category = "handler_error",
243                trace_id = %envelope.trace_id,
244                request_id = %envelope.request_id,
245                route_key = %envelope.route_key,
246                "❌ Message handling failed: {:?}", e
247            ),
248        }
249
250        result
251    }
252
253    /// Start the system
254    ///
255    /// # Startup Sequence
256    /// 1. Connect to signaling server and register Actor
257    /// 2. Initialize transport layer (WebRTC)
258    /// 3. Call lifecycle hook on_start (if Lifecycle trait is implemented)
259    /// 4. Start Mailbox processing loop (State Path serial processing)
260    /// 5. Start Transport (begin receiving messages)
261    /// 6. Create ActrRef for Shell to interact with Workload
262    ///
263    /// # Returns
264    /// - `ActrRef<W>`: Lightweight reference for Shell to call Workload methods
265    pub async fn start(mut self) -> ActorResult<crate::actr_ref::ActrRef<W>> {
266        tracing::info!("πŸš€ Starting ActrNode");
267
268        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
269        // 1. Connect to signaling server and register
270        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
271        tracing::info!("πŸ“‘ Connecting to signaling server");
272        self.signaling_client.connect().await.map_err(|e| {
273            actr_protocol::ProtocolError::TransportError(format!("Signaling connect failed: {e}"))
274        })?;
275        tracing::info!("βœ… Connected to signaling server");
276
277        // Get ActrType
278        let actr_type = self.workload.actor_type();
279        tracing::info!("πŸ“‹ Actor type: {}", actr_type.to_string_repr());
280
281        // Calculate ServiceSpec from config exports
282        let service_spec = self.config.calculate_service_spec();
283        if let Some(ref spec) = service_spec {
284            tracing::info!("πŸ“¦ Service fingerprint: {}", spec.fingerprint);
285            tracing::info!("πŸ“¦ Service tags: {:?}", spec.tags);
286        } else {
287            tracing::info!("πŸ“¦ No proto exports, ServiceSpec is None");
288        }
289
290        // Construct protobuf RegisterRequest
291        let register_request = RegisterRequest {
292            actr_type: actr_type.clone(),
293            realm: self.config.realm,
294            service_spec,
295            acl: self.config.acl.clone(),
296        };
297
298        tracing::info!("πŸ“€ Registering actor with signaling server (protobuf)");
299
300        // Use send_register_request to send and wait for response
301        let register_response = self
302            .signaling_client
303            .send_register_request(register_request)
304            .await
305            .map_err(|e| {
306                actr_protocol::ProtocolError::TransportError(format!(
307                    "Actor registration failed: {e}"
308                ))
309            })?;
310
311        // Handle RegisterResponse oneof result
312        match register_response.result {
313            Some(register_response::Result::Success(register_ok)) => {
314                let actor_id = register_ok.actr_id;
315                let credential = register_ok.credential;
316
317                tracing::info!("βœ… Registration successful");
318                tracing::info!(
319                    "πŸ†” Assigned ActrId: {}",
320                    actr_protocol::ActrIdExt::to_string_repr(&actor_id)
321                );
322                tracing::info!(
323                    "πŸ” Received credential (token_key_id: {})",
324                    credential.token_key_id
325                );
326                tracing::info!(
327                    "πŸ’“ Signaling heartbeat interval: {} seconds",
328                    register_ok.signaling_heartbeat_interval_secs
329                );
330
331                // Log additional information (if available)
332                if register_ok.psk.is_some() {
333                    tracing::debug!("πŸ”‘ Received PSK (bootstrap keying material)");
334                }
335                if let Some(expires_at) = &register_ok.credential_expires_at {
336                    tracing::debug!("⏰ Credential expires at: {}s", expires_at.seconds);
337                }
338
339                // Store ActrId and Credential
340                self.actor_id = Some(actor_id.clone());
341                self.credential = Some(credential.clone());
342
343                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
344                // 1.3. Store references to both inproc managers (already created in ActrSystem::new())
345                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
346                let shell_to_workload = self
347                    .context_factory
348                    .as_ref()
349                    .expect("ContextFactory must exist")
350                    .shell_to_workload();
351                let workload_to_shell = self
352                    .context_factory
353                    .as_ref()
354                    .expect("ContextFactory must exist")
355                    .workload_to_shell();
356                self.inproc_mgr = Some(shell_to_workload); // Workload receives from this
357                self.workload_to_shell_mgr = Some(workload_to_shell); // Workload sends to this
358
359                tracing::info!(
360                    "βœ… Inproc infrastructure already ready (created in ActrSystem::new())"
361                );
362
363                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
364                // 1.5. Create WebRTC infrastructure
365                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
366                tracing::info!("🌐 Initializing WebRTC infrastructure");
367
368                // Get MediaFrameRegistry from ContextFactory
369                let media_frame_registry = self
370                    .context_factory
371                    .as_ref()
372                    .expect("ContextFactory must exist")
373                    .media_frame_registry
374                    .clone();
375
376                // Create WebRtcCoordinator
377                let coordinator =
378                    Arc::new(crate::wire::webrtc::coordinator::WebRtcCoordinator::new(
379                        actor_id.clone(),
380                        credential.clone(),
381                        self.signaling_client.clone(),
382                        WebRtcConfig::default(),
383                        media_frame_registry,
384                    ));
385
386                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
387                // 1.6. Create OutprocTransportManager + OutprocOutGate (ζ–°ζžΆζž„)
388                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
389                tracing::info!("πŸ—οΈ  Creating OutprocTransportManager with WebRTC support");
390
391                // Create DefaultWireBuilder with WebRTC coordinator
392                use crate::transport::{DefaultWireBuilder, DefaultWireBuilderConfig};
393                let wire_builder_config = DefaultWireBuilderConfig {
394                    websocket_url_template: None, // WebSocket disabled for now
395                    enable_webrtc: true,
396                    enable_websocket: false,
397                };
398                let wire_builder = Arc::new(DefaultWireBuilder::new(
399                    Some(coordinator.clone()),
400                    wire_builder_config,
401                ));
402
403                // Create OutprocTransportManager
404                use crate::transport::OutprocTransportManager;
405                let transport_manager =
406                    Arc::new(OutprocTransportManager::new(actor_id.clone(), wire_builder));
407
408                // Create OutprocOutGate with WebRTC coordinator for MediaTrack support
409                use crate::outbound::{OutGate, OutprocOutGate};
410                let outproc_gate = Arc::new(OutprocOutGate::new(
411                    transport_manager,
412                    Some(coordinator.clone()), // Enable MediaTrack support
413                ));
414                let outproc_gate_enum = OutGate::OutprocOut(outproc_gate.clone());
415
416                tracing::info!("βœ… OutprocTransportManager + OutprocOutGate initialized");
417
418                // Get DataStreamRegistry from ContextFactory
419                let data_stream_registry = self
420                    .context_factory
421                    .as_ref()
422                    .expect("ContextFactory must exist")
423                    .data_stream_registry
424                    .clone();
425
426                // Create WebRtcGate with shared pending_requests and DataStreamRegistry
427                let pending_requests = outproc_gate.get_pending_requests();
428                let gate = Arc::new(crate::wire::webrtc::gate::WebRtcGate::new(
429                    coordinator.clone(),
430                    pending_requests,
431                    data_stream_registry,
432                ));
433
434                // Set local_id
435                gate.set_local_id(actor_id.clone()).await;
436
437                tracing::info!(
438                    "βœ… WebRtcGate created with shared pending_requests and DataStreamRegistry"
439                );
440
441                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
442                // 1.7. Set outproc_gate in ContextFactory (completing initialization)
443                // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
444                tracing::info!("πŸ”§ Setting outproc_gate in ContextFactory");
445                self.context_factory
446                    .as_mut()
447                    .expect("ContextFactory must exist")
448                    .set_outproc_gate(outproc_gate_enum);
449
450                tracing::info!(
451                    "βœ… ContextFactory fully initialized (inproc + outproc gates ready)"
452                );
453
454                // Save references (WebRtcGate kept for backward compatibility if needed)
455                self.webrtc_coordinator = Some(coordinator.clone());
456                self.webrtc_gate = Some(gate.clone());
457
458                tracing::info!("βœ… WebRTC infrastructure initialized");
459            }
460            Some(register_response::Result::Error(error)) => {
461                tracing::error!(
462                    severity = 10,
463                    error_category = "registration_error",
464                    error_code = error.code,
465                    "❌ Registration failed: code={}, message={}",
466                    error.code,
467                    error.message
468                );
469                return Err(actr_protocol::ProtocolError::TransportError(format!(
470                    "Registration rejected: {} (code: {})",
471                    error.message, error.code
472                )));
473            }
474            None => {
475                tracing::error!(
476                    severity = 10,
477                    error_category = "registration_error",
478                    "❌ Registration response missing result"
479                );
480                return Err(actr_protocol::ProtocolError::TransportError(
481                    "Invalid registration response: missing result".to_string(),
482                ));
483            }
484        }
485
486        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
487        // 2. Transport layer initialization (completed via WebRTC infrastructure)
488        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
489        tracing::info!("βœ… Transport layer initialized via WebRTC infrastructure");
490
491        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
492        // 3. Collect task handles before converting to Arc
493        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
494        let mut task_handles = Vec::new();
495
496        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
497        // 3.1 Convert to Arc (before starting background loops)
498        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
499        // Clone actor_id before moving self into Arc
500        let actor_id = self
501            .actor_id
502            .as_ref()
503            .ok_or_else(|| {
504                actr_protocol::ProtocolError::InvalidStateTransition(
505                    "Actor ID not set - registration must complete before starting node"
506                        .to_string(),
507                )
508            })?
509            .clone();
510
511        let actor_id_for_shell = actor_id.clone();
512        let shutdown_token = self.shutdown_token.clone();
513        let node_ref = Arc::new(self);
514
515        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
516        // 3.5. Start WebRTC background loops (BEFORE on_start)
517        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
518        // CRITICAL: Start signaling loop before on_start() to avoid deadlock
519        // where on_start() tries to send messages but signaling loop isn't running
520        tracing::info!("πŸš€ Starting WebRTC background loops");
521
522        // Start WebRtcCoordinator signaling loop
523        if let Some(coordinator) = &node_ref.webrtc_coordinator {
524            coordinator.clone().start().await.map_err(|e| {
525                actr_protocol::ProtocolError::TransportError(format!(
526                    "WebRtcCoordinator start failed: {e}"
527                ))
528            })?;
529            tracing::info!("βœ… WebRtcCoordinator signaling loop started");
530        }
531
532        // Start WebRtcGate message receive loop (route to Mailbox)
533        if let Some(gate) = &node_ref.webrtc_gate {
534            gate.start_receive_loop(node_ref.mailbox.clone())
535                .await
536                .map_err(|e| {
537                    actr_protocol::ProtocolError::TransportError(format!(
538                        "WebRtcGate receive loop start failed: {e}"
539                    ))
540                })?;
541            tracing::info!("βœ… WebRtcGate β†’ Mailbox routing started");
542        }
543
544        tracing::info!("βœ… WebRTC background loops started");
545
546        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
547        // 4. Call lifecycle hook on_start (AFTER WebRTC loops are running)
548        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
549        tracing::info!("πŸͺ Calling lifecycle hook: on_start");
550
551        let ctx = node_ref
552            .context_factory
553            .as_ref()
554            .expect("ContextFactory must be initialized before on_start")
555            .create(
556                &actor_id,
557                None,        // caller_id
558                "bootstrap", // trace_id
559                "bootstrap", // request_id
560            );
561        node_ref.workload.on_start(&ctx).await?;
562        tracing::info!("βœ… Lifecycle hook on_start completed");
563
564        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
565        // 4.6. Start Inproc receive loop (Shell β†’ Workload)
566        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
567        tracing::info!("πŸ”„ Starting Inproc receive loop (Shell β†’ Workload)");
568        // Start Workload receive loop (Shell β†’ Workload REQUEST)
569        if let Some(shell_to_workload) = &node_ref.inproc_mgr {
570            if let Some(workload_to_shell) = &node_ref.workload_to_shell_mgr {
571                let node = node_ref.clone();
572                let request_rx_lane = shell_to_workload
573                    .get_lane(actr_protocol::PayloadType::RpcReliable, None)
574                    .await
575                    .map_err(|e| {
576                        actr_protocol::ProtocolError::TransportError(format!(
577                            "Failed to get Workload receive lane: {e}"
578                        ))
579                    })?;
580                let response_tx = workload_to_shell.clone();
581                let shutdown = shutdown_token.clone();
582
583                let inproc_handle = tokio::spawn(async move {
584                    loop {
585                        tokio::select! {
586                            _ = shutdown.cancelled() => {
587                                tracing::info!("πŸ“­ Workload receive loop (Shell β†’ Workload) received shutdown signal");
588                                break;
589                            }
590
591                            envelope_result = request_rx_lane.recv_envelope() => {
592                                match envelope_result {
593                                    Ok(envelope) => {
594                                        let request_id = envelope.request_id.clone();
595                                        tracing::debug!("πŸ“¨ Workload received REQUEST from Shell: request_id={}", request_id);
596
597                                        // Shell calls have no caller_id (local process communication)
598                                        match node.handle_incoming(envelope.clone(), None).await {
599                                            Ok(response_bytes) => {
600                                                // Send RESPONSE back via workload_to_shell
601                                                // Keep same route_key (no prefix needed - separate channels!)
602                                                let response_envelope = RpcEnvelope {
603                                                    route_key: envelope.route_key.clone(),
604                                                    payload: Some(response_bytes),
605                                                    error: None,
606                                                    trace_id: envelope.trace_id.clone(),
607                                                    request_id: request_id.clone(),
608                                                    metadata: Vec::new(),
609                                                    timeout_ms: 30000,
610                                                };
611
612                                                // Send via Workload β†’ Shell channel
613                                                if let Err(e) = response_tx.send_message(PayloadType::RpcReliable, None, response_envelope).await {
614                                                    tracing::error!(
615                                                        severity = 7,
616                                                        error_category = "transport_error",
617                                                        trace_id = %envelope.trace_id,
618                                                        request_id = %request_id,
619                                                        "❌ Failed to send RESPONSE to Shell: {:?}", e
620                                                    );
621                                                }
622                                            }
623                                            Err(e) => {
624                                                tracing::error!(
625                                                    severity = 6,
626                                                    error_category = "handler_error",
627                                                    trace_id = %envelope.trace_id,
628                                                    request_id = %request_id,
629                                                    route_key = %envelope.route_key,
630                                                    "❌ Workload message handling failed: {:?}", e
631                                                );
632
633                                                // Send error response (system-level error on envelope)
634                                                let error_response = actr_protocol::ErrorResponse {
635                                                    code: protocol_error_to_code(&e),
636                                                    message: e.to_string(),
637                                                };
638
639                                                let error_envelope = RpcEnvelope {
640                                                    route_key: envelope.route_key.clone(),
641                                                    payload: None,
642                                                    error: Some(error_response),
643                                                    trace_id: envelope.trace_id.clone(),
644                                                    request_id: request_id.clone(),
645                                                    metadata: Vec::new(),
646                                                    timeout_ms: 30000,
647                                                };
648
649                                                if let Err(e) = response_tx.send_message(PayloadType::RpcReliable, None, error_envelope).await {
650                                                    tracing::error!(
651                                                        severity = 7,
652                                                        error_category = "transport_error",
653                                                        trace_id = %envelope.trace_id,
654                                                        request_id = %request_id,
655                                                        "❌ Failed to send ERROR response to Shell: {:?}", e
656                                                    );
657                                                }
658                                            }
659                                        }
660                                    }
661                                    Err(e) => {
662                                        tracing::error!(
663                                            severity = 8,
664                                            error_category = "transport_error",
665                                            "❌ Failed to receive from Shell β†’ Workload lane: {:?}", e
666                                        );
667                                        break;
668                                    }
669                                }
670                            }
671                        }
672                    }
673                    tracing::info!(
674                        "βœ… Workload receive loop (Shell β†’ Workload) terminated gracefully"
675                    );
676                });
677
678                task_handles.push(inproc_handle);
679            }
680        }
681        tracing::info!("βœ… Workload receive loop (Shell β†’ Workload REQUEST) started");
682
683        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
684        // 4.7. Start Shell receive loop (Workload β†’ Shell RESPONSE)
685        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
686        tracing::info!("πŸ”„ Starting Shell receive loop (Workload β†’ Shell RESPONSE)");
687        if let Some(workload_to_shell) = &node_ref.workload_to_shell_mgr {
688            if let Some(shell_to_workload) = &node_ref.inproc_mgr {
689                let response_rx_lane = workload_to_shell
690                    .get_lane(actr_protocol::PayloadType::RpcReliable, None)
691                    .await
692                    .map_err(|e| {
693                        actr_protocol::ProtocolError::TransportError(format!(
694                            "Failed to get Shell receive lane: {e}"
695                        ))
696                    })?;
697                let request_mgr = shell_to_workload.clone();
698                let shutdown = shutdown_token.clone();
699
700                let shell_receive_handle = tokio::spawn(async move {
701                    loop {
702                        tokio::select! {
703                            _ = shutdown.cancelled() => {
704                                tracing::info!("πŸ“­ Shell receive loop (Workload β†’ Shell) received shutdown signal");
705                                break;
706                            }
707
708                            envelope_result = response_rx_lane.recv_envelope() => {
709                                match envelope_result {
710                                    Ok(envelope) => {
711                                        tracing::debug!("πŸ“¨ Shell received RESPONSE from Workload: request_id={}", envelope.request_id);
712
713                                        // Check if response is success or error
714                                        match (envelope.payload, envelope.error) {
715                                            (Some(payload), None) => {
716                                                // Success response
717                                                if let Err(e) = request_mgr.complete_response(&envelope.request_id, payload).await {
718                                                    tracing::warn!(
719                                                        severity = 4,
720                                                        error_category = "orphan_response",
721                                                        trace_id = %envelope.trace_id,
722                                                        request_id = %envelope.request_id,
723                                                        "⚠️  No pending request found for response: {:?}", e
724                                                    );
725                                                }
726                                            }
727                                            (None, Some(error)) => {
728                                                // Error response - convert to ProtocolError and complete with error
729                                                let protocol_err = actr_protocol::ProtocolError::TransportError(
730                                                    format!("RPC error {}: {}", error.code, error.message)
731                                                );
732                                                if let Err(e) = request_mgr.complete_error(&envelope.request_id, protocol_err).await {
733                                                    tracing::warn!(
734                                                        severity = 4,
735                                                        error_category = "orphan_response",
736                                                        trace_id = %envelope.trace_id,
737                                                        request_id = %envelope.request_id,
738                                                        "⚠️  No pending request found for error response: {:?}", e
739                                                    );
740                                                }
741                                            }
742                                            _ => {
743                                                tracing::error!(
744                                                    severity = 7,
745                                                    error_category = "protocol_error",
746                                                    trace_id = %envelope.trace_id,
747                                                    request_id = %envelope.request_id,
748                                                    "❌ Invalid RpcEnvelope: both payload and error are present or both absent"
749                                                );
750                                            }
751                                        }
752                                    }
753                                    Err(e) => {
754                                        tracing::error!(
755                                            severity = 8,
756                                            error_category = "transport_error",
757                                            "❌ Failed to receive from Workload β†’ Shell lane: {:?}", e
758                                        );
759                                        break;
760                                    }
761                                }
762                            }
763                        }
764                    }
765                    tracing::info!(
766                        "βœ… Shell receive loop (Workload β†’ Shell) terminated gracefully"
767                    );
768                });
769
770                task_handles.push(shell_receive_handle);
771            }
772        }
773        tracing::info!("βœ… Shell receive loop (Workload β†’ Shell RESPONSE) started");
774
775        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
776        // 5. Start Mailbox processing loop (State Path)
777        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
778        tracing::info!("πŸ”„ Starting Mailbox processing loop (State Path)");
779        {
780            let node = node_ref.clone();
781            let mailbox = node_ref.mailbox.clone();
782            let gate = node_ref.webrtc_gate.clone();
783            let shutdown = shutdown_token.clone();
784
785            let mailbox_handle = tokio::spawn(async move {
786                loop {
787                    tokio::select! {
788                        // Listen for shutdown signal
789                        _ = shutdown.cancelled() => {
790                            tracing::info!("πŸ“­ Mailbox loop received shutdown signal");
791                            break;
792                        }
793
794                        // Dequeue messages (by priority)
795                        result = mailbox.dequeue() => {
796                            match result {
797                                Ok(messages) => {
798                                    if messages.is_empty() {
799                                        // Queue empty, sleep briefly
800                                        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
801                                        continue;
802                                    }
803
804                                    tracing::debug!("πŸ“¬ Mailbox dequeue: {} messages", messages.len());
805
806                                    // Process messages one by one
807                                    for msg_record in messages {
808                                        // Deserialize RpcEnvelope (Protobuf)
809                                        match RpcEnvelope::decode(&msg_record.payload[..]) {
810                                            Ok(envelope) => {
811                                                let request_id = envelope.request_id.clone();
812                                                tracing::debug!("πŸ“¦ Processing message: request_id={}", request_id);
813
814                                                // Decode caller_id from MessageRecord.from (transport layer)
815                                                let caller_id_result = ActrId::decode(&msg_record.from[..]);
816                                                let caller_id_ref = caller_id_result.as_ref().ok();
817
818                                                if caller_id_ref.is_none() {
819                                                    tracing::warn!(
820                                                        trace_id = %envelope.trace_id,
821                                                        request_id = %request_id,
822                                                        "⚠️  Failed to decode caller_id from MessageRecord.from"
823                                                    );
824                                                }
825
826                                                // Call handle_incoming with caller_id from transport layer
827                                                match node.handle_incoming(envelope.clone(), caller_id_ref).await {
828                                                    Ok(response_bytes) => {
829                                                        // Send response (reuse request_id)
830                                                        if let Some(ref gate) = gate {
831                                                            // Use already decoded caller_id
832                                                            match caller_id_result {
833                                                                Ok(caller) => {
834                                                                    // Construct response RpcEnvelope (reuse request_id!)
835                                                                    let response_envelope = RpcEnvelope {
836                                                                        request_id,  // Reuse!
837                                                                        route_key: envelope.route_key.clone(),
838                                                                        payload: Some(response_bytes),
839                                                                        error: None,
840                                                                        trace_id: envelope.trace_id.clone(),
841                                                                        metadata: Vec::new(),  // Response doesn't need extra metadata
842                                                                        timeout_ms: 30000,
843                                                                    };
844
845                                                                    if let Err(e) = gate.send_response(&caller, response_envelope).await {
846                                                                        tracing::error!(
847                                                                            severity = 7,
848                                                                            error_category = "transport_error",
849                                                                            trace_id = %envelope.trace_id,
850                                                                            request_id = %envelope.request_id,
851                                                                            "❌ Failed to send response: {:?}", e
852                                                                        );
853                                                                    }
854                                                                }
855                                                                Err(e) => {
856                                                                    tracing::error!(
857                                                                        severity = 8,
858                                                                        error_category = "protobuf_decode",
859                                                                        trace_id = %envelope.trace_id,
860                                                                        request_id = %envelope.request_id,
861                                                                        "❌ Failed to decode caller_id: {:?}", e
862                                                                    );
863                                                                }
864                                                            }
865                                                        }
866
867                                                        // ACK message
868                                                        if let Err(e) = mailbox.ack(msg_record.id).await {
869                                                            tracing::error!(
870                                                                severity = 9,
871                                                                error_category = "mailbox_error",
872                                                                trace_id = %envelope.trace_id,
873                                                                request_id = %envelope.request_id,
874                                                                message_id = %msg_record.id,
875                                                                "❌ Mailbox ACK failed: {:?}", e
876                                                            );
877                                                        }
878                                                    }
879                                                    Err(e) => {
880                                                        tracing::error!(
881                                                            severity = 6,
882                                                            error_category = "handler_error",
883                                                            trace_id = %envelope.trace_id,
884                                                            request_id = %envelope.request_id,
885                                                            route_key = %envelope.route_key,
886                                                            "❌ handle_incoming failed: {:?}", e
887                                                        );
888                                                        // ACK to avoid infinite retries
889                                                        // Application errors are caller's responsibility
890                                                        let _ = mailbox.ack(msg_record.id).await;
891                                                    }
892                                                }
893                                            }
894                                            Err(e) => {
895                                                // Poison message - cannot decode RpcEnvelope
896                                                tracing::error!(
897                                                    severity = 9,
898                                                    error_category = "protobuf_decode",
899                                                    message_id = %msg_record.id,
900                                                    "❌ Poison message: Failed to deserialize RpcEnvelope: {:?}", e
901                                                );
902
903                                                // Write to Dead Letter Queue
904                                                use actr_mailbox::DlqRecord;
905                                                use chrono::Utc;
906                                                use uuid::Uuid;
907
908                                                let dlq_record = DlqRecord {
909                                                    id: Uuid::new_v4(),
910                                                    original_message_id: Some(msg_record.id.to_string()),
911                                                    from: Some(msg_record.from.clone()),
912                                                    to: node.actor_id.as_ref().map(|id| {
913                                                        let mut buf = Vec::new();
914                                                        id.encode(&mut buf).unwrap();
915                                                        buf
916                                                    }),
917                                                    raw_bytes: msg_record.payload.clone(),
918                                                    error_message: format!("Protobuf decode failed: {e}"),
919                                                    error_category: "protobuf_decode".to_string(),
920                                                    trace_id: format!("mailbox-{}", msg_record.id),  // Fallback trace_id
921                                                    request_id: None,
922                                                    created_at: Utc::now(),
923                                                    redrive_attempts: 0,
924                                                    last_redrive_at: None,
925                                                    context: Some(format!(
926                                                        r#"{{"source":"mailbox","priority":"{}"}}"#,
927                                                        match msg_record.priority {
928                                                            actr_mailbox::MessagePriority::High => "high",
929                                                            actr_mailbox::MessagePriority::Normal => "normal",
930                                                        }
931                                                    )),
932                                                };
933
934                                                if let Err(dlq_err) = node.dlq.enqueue(dlq_record).await {
935                                                    tracing::error!(
936                                                        severity = 10,
937                                                        "❌ CRITICAL: Failed to write poison message to DLQ: {:?}", dlq_err
938                                                    );
939                                                } else {
940                                                    tracing::warn!(
941                                                        severity = 9,
942                                                        "☠️ Poison message moved to DLQ: message_id={}", msg_record.id
943                                                    );
944                                                }
945
946                                                // ACK the poison message to remove from mailbox
947                                                let _ = mailbox.ack(msg_record.id).await;
948                                            }
949                                        }
950                                    }
951                                }
952                                Err(e) => {
953                                    tracing::error!(
954                                        severity = 9,
955                                        error_category = "mailbox_error",
956                                        "❌ Mailbox dequeue failed: {:?}", e
957                                    );
958                                    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
959                                }
960                            }
961                        }
962                    }
963                }
964                tracing::info!("βœ… Mailbox processing loop terminated gracefully");
965            });
966
967            task_handles.push(mailbox_handle);
968        }
969        tracing::info!("βœ… Mailbox processing loop started");
970
971        tracing::info!("βœ… ActrNode started successfully");
972
973        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
974        // 6. Create ActrRef for Shell to interact with Workload
975        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
976        use crate::actr_ref::{ActrRef, ActrRefShared};
977        use crate::outbound::InprocOutGate;
978
979        // Create InprocOutGate from shell_to_workload transport manager
980        let shell_to_workload = node_ref
981            .inproc_mgr
982            .clone()
983            .expect("inproc_mgr must be initialized");
984        let inproc_gate = Arc::new(InprocOutGate::new(shell_to_workload));
985
986        // Create ActrRefShared
987        let actr_ref_shared = Arc::new(ActrRefShared {
988            actor_id: actor_id_for_shell.clone(),
989            inproc_gate,
990            shutdown_token: shutdown_token.clone(),
991            task_handles,
992        });
993
994        // Create ActrRef
995        let actr_ref = ActrRef::new(actr_ref_shared);
996
997        tracing::info!("βœ… ActrRef created (Shell β†’ Workload communication handle)");
998
999        Ok(actr_ref)
1000    }
1001}