actr_runtime/lifecycle/actr_node.rs
1//! ActrNode - ActrSystem + Workload (1:1 composition)
2
3use actr_framework::{Bytes, Workload};
4use actr_protocol::prost::Message as ProstMessage;
5use actr_protocol::{ActorResult, ActrId, PayloadType, RpcEnvelope};
6use futures_util::FutureExt;
7use std::sync::Arc;
8use tokio_util::sync::CancellationToken;
9
10use crate::context_factory::ContextFactory;
11use crate::transport::InprocTransportManager;
12
13// Use types from sub-crates
14use crate::wire::webrtc::{SignalingClient, WebRtcConfig};
15use actr_mailbox::{DeadLetterQueue, Mailbox};
16use actr_protocol::{AIdCredential, RegisterRequest, register_response};
17
18// Use extension traits from actr-protocol
19use actr_protocol::{ActrIdExt, ActrTypeExt};
20
21/// ActrNode - ActrSystem + Workload (1:1 composition)
22///
23/// # Generic Parameters
24/// - `W`: Workload type
25///
26/// # MessageDispatcher Association
27/// - Statically associated via W::Dispatcher
28/// - Does not store Dispatcher instance (not even ZST needed)
29/// - Dispatch calls entirely through type system
30pub struct ActrNode<W: Workload> {
31 /// Runtime configuration
32 pub(crate) config: actr_config::Config,
33
34 /// Workload instance (the only business logic)
35 pub(crate) workload: Arc<W>,
36
37 /// SQLite persistent mailbox
38 pub(crate) mailbox: Arc<dyn Mailbox>,
39
40 /// Dead Letter Queue for poison messages
41 pub(crate) dlq: Arc<dyn DeadLetterQueue>,
42
43 /// Context factory (created in start() after obtaining ActorId)
44 pub(crate) context_factory: Option<ContextFactory>,
45
46 /// Signaling client
47 pub(crate) signaling_client: Arc<dyn SignalingClient>,
48
49 /// Actor ID (obtained after startup)
50 pub(crate) actor_id: Option<ActrId>,
51
52 /// Actor Credential (obtained after startup, used for subsequent authentication messages)
53 pub(crate) credential: Option<AIdCredential>,
54
55 /// WebRTC coordinator (created after startup)
56 pub(crate) webrtc_coordinator: Option<Arc<crate::wire::webrtc::coordinator::WebRtcCoordinator>>,
57
58 /// WebRTC Gate (created after startup)
59 pub(crate) webrtc_gate: Option<Arc<crate::wire::webrtc::gate::WebRtcGate>>,
60
61 /// Shell β Workload Transport Manager
62 ///
63 /// Workload receives REQUEST from Shell (zero serialization, direct RpcEnvelope passing)
64 pub(crate) inproc_mgr: Option<Arc<InprocTransportManager>>,
65
66 /// Workload β Shell Transport Manager
67 ///
68 /// Workload sends RESPONSE to Shell (separate pending_requests from Shell's)
69 pub(crate) workload_to_shell_mgr: Option<Arc<InprocTransportManager>>,
70
71 /// Shutdown token for graceful shutdown
72 pub(crate) shutdown_token: CancellationToken,
73}
74
75/// Map ProtocolError to error code for ErrorResponse
76fn protocol_error_to_code(err: &actr_protocol::ProtocolError) -> u32 {
77 use actr_protocol::ProtocolError;
78 match err {
79 ProtocolError::Actr(_) => 400, // Bad Request - identity/decode error
80 ProtocolError::Uri(_) => 400, // Bad Request - URI parsing error
81 ProtocolError::Name(_) => 400, // Bad Request - invalid name
82 ProtocolError::SerializationError(_) => 500, // Internal Server Error
83 ProtocolError::DeserializationError(_) => 400, // Bad Request - invalid payload
84 ProtocolError::DecodeError(_) => 400, // Bad Request - decode failure
85 ProtocolError::EncodeError(_) => 500, // Internal Server Error
86 ProtocolError::UnknownRoute(_) => 404, // Not Found - route not found
87 ProtocolError::TransportError(_) => 503, // Service Unavailable
88 ProtocolError::Timeout => 504, // Gateway Timeout
89 ProtocolError::TargetNotFound(_) => 404, // Not Found
90 ProtocolError::TargetUnavailable(_) => 503, // Service Unavailable
91 ProtocolError::InvalidStateTransition(_) => 500, // Internal Server Error
92 }
93}
94
95impl<W: Workload> ActrNode<W> {
96 /// Get Inproc Transport Manager
97 ///
98 /// # Returns
99 /// - `Some(Arc<InprocTransportManager>)`: Initialized manager
100 /// - `None`: Not yet started (need to call start() first)
101 ///
102 /// # Use Cases
103 /// - Workload internals need to communicate with Shell
104 /// - Create custom LatencyFirst/MediaTrack channels
105 pub fn inproc_mgr(&self) -> Option<Arc<InprocTransportManager>> {
106 self.inproc_mgr.clone()
107 }
108
109 /// Handle incoming message envelope
110 ///
111 /// # Performance Analysis
112 /// 1. create_context: ~10ns
113 /// 2. W::Dispatcher::dispatch: ~5-10ns (static match, can be inlined)
114 /// 3. User business logic: variable
115 ///
116 /// Framework overhead: ~15-20ns (compared to 50-100ns in traditional approaches)
117 ///
118 /// # Zero-cost Abstraction
119 /// - Compiler can inline entire call chain
120 /// - Match branches can be directly expanded
121 /// - Final generated code approaches hand-written match expression
122 ///
123 /// # Parameters
124 /// - `envelope`: The RPC envelope containing the message
125 /// - `caller_id`: The ActrId of the caller (from transport layer, None for local Shell calls)
126 ///
127 /// # caller_id Design
128 ///
129 /// **Why not in RpcEnvelope?**
130 /// - Transport layer (WebRTC/Mailbox) already knows the sender
131 /// - All connections are direct P2P (no intermediaries)
132 /// - Storing in envelope would be redundant duplication
133 ///
134 /// **How it works:**
135 /// - WebRTC/Mailbox stores sender in `MessageRecord.from` (Protobuf bytes)
136 /// - Only decoded when creating Context (once per message)
137 /// - Shell calls pass `None` (local process, no remote caller)
138 /// - Remote calls decode from `MessageRecord.from`
139 ///
140 /// **trace_id vs request_id:**
141 /// - `trace_id`: Distributed tracing across entire call chain (A β B β C)
142 /// - `request_id`: Unique identifier for each request-response pair
143 /// - Both kept for flexibility in complex scenarios
144 /// - Single-hop calls: effectively identical
145 /// - Multi-hop calls: trace_id spans all hops, request_id per hop
146 pub async fn handle_incoming(
147 &self,
148 envelope: RpcEnvelope,
149 caller_id: Option<&ActrId>,
150 ) -> ActorResult<Bytes> {
151 use actr_framework::MessageDispatcher;
152
153 // Log received message
154 if let Some(caller) = caller_id {
155 tracing::debug!(
156 "π¨ Handling incoming message: route_key={}, caller={}, trace_id={}, request_id={}",
157 envelope.route_key,
158 caller.to_string_repr(),
159 envelope.trace_id,
160 envelope.request_id
161 );
162 } else {
163 tracing::debug!(
164 "π¨ Handling incoming message: route_key={}, trace_id={}, request_id={}",
165 envelope.route_key,
166 envelope.trace_id,
167 envelope.request_id
168 );
169 }
170
171 // 1. Create Context with caller_id from transport layer
172 let actor_id = self.actor_id.as_ref().ok_or_else(|| {
173 actr_protocol::ProtocolError::InvalidStateTransition(
174 "Actor ID not set - node must be started before handling messages".to_string(),
175 )
176 })?;
177
178 let ctx = self
179 .context_factory
180 .as_ref()
181 .expect("ContextFactory must be initialized in start()")
182 .create(
183 actor_id,
184 caller_id, // caller_id from transport layer (MessageRecord.from)
185 &envelope.trace_id,
186 &envelope.request_id,
187 );
188
189 // 2. Static MessageRouter dispatch (zero-cost abstraction)
190 // Compiler will inline entire call chain, generating code close to hand-written match
191 //
192 // Wrap dispatch in panic catching to prevent handler panics from crashing the runtime
193 let result = std::panic::AssertUnwindSafe(W::Dispatcher::dispatch(
194 &self.workload,
195 envelope.clone(),
196 &ctx,
197 ))
198 .catch_unwind()
199 .await;
200
201 let result = match result {
202 Ok(handler_result) => handler_result,
203 Err(panic_payload) => {
204 // Handler panicked - extract panic info
205 let panic_info = if let Some(s) = panic_payload.downcast_ref::<&str>() {
206 s.to_string()
207 } else if let Some(s) = panic_payload.downcast_ref::<String>() {
208 s.clone()
209 } else {
210 "Unknown panic payload".to_string()
211 };
212
213 tracing::error!(
214 severity = 8,
215 error_category = "handler_panic",
216 route_key = envelope.route_key,
217 trace_id = envelope.trace_id,
218 "β Handler panicked: {}",
219 panic_info
220 );
221
222 // Return DecodeFailure error with panic info
223 // (using DecodeFailure as a proxy for "cannot process message")
224 Err(actr_protocol::ProtocolError::Actr(
225 actr_protocol::ActrError::DecodeFailure {
226 message: format!("Handler panicked: {panic_info}"),
227 },
228 ))
229 }
230 };
231
232 // 3. Log result
233 match &result {
234 Ok(_) => tracing::debug!(
235 trace_id = %envelope.trace_id,
236 request_id = %envelope.request_id,
237 route_key = %envelope.route_key,
238 "β
Message handled successfully"
239 ),
240 Err(e) => tracing::error!(
241 severity = 6,
242 error_category = "handler_error",
243 trace_id = %envelope.trace_id,
244 request_id = %envelope.request_id,
245 route_key = %envelope.route_key,
246 "β Message handling failed: {:?}", e
247 ),
248 }
249
250 result
251 }
252
253 /// Start the system
254 ///
255 /// # Startup Sequence
256 /// 1. Connect to signaling server and register Actor
257 /// 2. Initialize transport layer (WebRTC)
258 /// 3. Call lifecycle hook on_start (if Lifecycle trait is implemented)
259 /// 4. Start Mailbox processing loop (State Path serial processing)
260 /// 5. Start Transport (begin receiving messages)
261 /// 6. Create ActrRef for Shell to interact with Workload
262 ///
263 /// # Returns
264 /// - `ActrRef<W>`: Lightweight reference for Shell to call Workload methods
265 pub async fn start(mut self) -> ActorResult<crate::actr_ref::ActrRef<W>> {
266 tracing::info!("π Starting ActrNode");
267
268 // βββββββββββββββββββββββββββββββββββββββββββ
269 // 1. Connect to signaling server and register
270 // βββββββββββββββββββββββββββββββββββββββββββ
271 tracing::info!("π‘ Connecting to signaling server");
272 self.signaling_client.connect().await.map_err(|e| {
273 actr_protocol::ProtocolError::TransportError(format!("Signaling connect failed: {e}"))
274 })?;
275 tracing::info!("β
Connected to signaling server");
276
277 // Get ActrType
278 let actr_type = self.workload.actor_type();
279 tracing::info!("π Actor type: {}", actr_type.to_string_repr());
280
281 // Calculate ServiceSpec from config exports
282 let service_spec = self.config.calculate_service_spec();
283 if let Some(ref spec) = service_spec {
284 tracing::info!("π¦ Service fingerprint: {}", spec.fingerprint);
285 tracing::info!("π¦ Service tags: {:?}", spec.tags);
286 } else {
287 tracing::info!("π¦ No proto exports, ServiceSpec is None");
288 }
289
290 // Construct protobuf RegisterRequest
291 let register_request = RegisterRequest {
292 actr_type: actr_type.clone(),
293 realm: self.config.realm,
294 service_spec,
295 acl: self.config.acl.clone(),
296 };
297
298 tracing::info!("π€ Registering actor with signaling server (protobuf)");
299
300 // Use send_register_request to send and wait for response
301 let register_response = self
302 .signaling_client
303 .send_register_request(register_request)
304 .await
305 .map_err(|e| {
306 actr_protocol::ProtocolError::TransportError(format!(
307 "Actor registration failed: {e}"
308 ))
309 })?;
310
311 // Handle RegisterResponse oneof result
312 match register_response.result {
313 Some(register_response::Result::Success(register_ok)) => {
314 let actor_id = register_ok.actr_id;
315 let credential = register_ok.credential;
316
317 tracing::info!("β
Registration successful");
318 tracing::info!(
319 "π Assigned ActrId: {}",
320 actr_protocol::ActrIdExt::to_string_repr(&actor_id)
321 );
322 tracing::info!(
323 "π Received credential (token_key_id: {})",
324 credential.token_key_id
325 );
326 tracing::info!(
327 "π Signaling heartbeat interval: {} seconds",
328 register_ok.signaling_heartbeat_interval_secs
329 );
330
331 // Log additional information (if available)
332 if register_ok.psk.is_some() {
333 tracing::debug!("π Received PSK (bootstrap keying material)");
334 }
335 if let Some(expires_at) = ®ister_ok.credential_expires_at {
336 tracing::debug!("β° Credential expires at: {}s", expires_at.seconds);
337 }
338
339 // Store ActrId and Credential
340 self.actor_id = Some(actor_id.clone());
341 self.credential = Some(credential.clone());
342
343 // βββββββββββββββββββββββββββββββββββββββββββ
344 // 1.3. Store references to both inproc managers (already created in ActrSystem::new())
345 // βββββββββββββββββββββββββββββββββββββββββββ
346 let shell_to_workload = self
347 .context_factory
348 .as_ref()
349 .expect("ContextFactory must exist")
350 .shell_to_workload();
351 let workload_to_shell = self
352 .context_factory
353 .as_ref()
354 .expect("ContextFactory must exist")
355 .workload_to_shell();
356 self.inproc_mgr = Some(shell_to_workload); // Workload receives from this
357 self.workload_to_shell_mgr = Some(workload_to_shell); // Workload sends to this
358
359 tracing::info!(
360 "β
Inproc infrastructure already ready (created in ActrSystem::new())"
361 );
362
363 // βββββββββββββββββββββββββββββββββββββββββββ
364 // 1.5. Create WebRTC infrastructure
365 // βββββββββββββββββββββββββββββββββββββββββββ
366 tracing::info!("π Initializing WebRTC infrastructure");
367
368 // Get MediaFrameRegistry from ContextFactory
369 let media_frame_registry = self
370 .context_factory
371 .as_ref()
372 .expect("ContextFactory must exist")
373 .media_frame_registry
374 .clone();
375
376 // Create WebRtcCoordinator
377 let coordinator =
378 Arc::new(crate::wire::webrtc::coordinator::WebRtcCoordinator::new(
379 actor_id.clone(),
380 credential.clone(),
381 self.signaling_client.clone(),
382 WebRtcConfig::default(),
383 media_frame_registry,
384 ));
385
386 // βββββββββββββββββββββββββββββββββββββββββββ
387 // 1.6. Create OutprocTransportManager + OutprocOutGate (ζ°ζΆζ)
388 // βββββββββββββββββββββββββββββββββββββββββββ
389 tracing::info!("ποΈ Creating OutprocTransportManager with WebRTC support");
390
391 // Create DefaultWireBuilder with WebRTC coordinator
392 use crate::transport::{DefaultWireBuilder, DefaultWireBuilderConfig};
393 let wire_builder_config = DefaultWireBuilderConfig {
394 websocket_url_template: None, // WebSocket disabled for now
395 enable_webrtc: true,
396 enable_websocket: false,
397 };
398 let wire_builder = Arc::new(DefaultWireBuilder::new(
399 Some(coordinator.clone()),
400 wire_builder_config,
401 ));
402
403 // Create OutprocTransportManager
404 use crate::transport::OutprocTransportManager;
405 let transport_manager =
406 Arc::new(OutprocTransportManager::new(actor_id.clone(), wire_builder));
407
408 // Create OutprocOutGate with WebRTC coordinator for MediaTrack support
409 use crate::outbound::{OutGate, OutprocOutGate};
410 let outproc_gate = Arc::new(OutprocOutGate::new(
411 transport_manager,
412 Some(coordinator.clone()), // Enable MediaTrack support
413 ));
414 let outproc_gate_enum = OutGate::OutprocOut(outproc_gate.clone());
415
416 tracing::info!("β
OutprocTransportManager + OutprocOutGate initialized");
417
418 // Get DataStreamRegistry from ContextFactory
419 let data_stream_registry = self
420 .context_factory
421 .as_ref()
422 .expect("ContextFactory must exist")
423 .data_stream_registry
424 .clone();
425
426 // Create WebRtcGate with shared pending_requests and DataStreamRegistry
427 let pending_requests = outproc_gate.get_pending_requests();
428 let gate = Arc::new(crate::wire::webrtc::gate::WebRtcGate::new(
429 coordinator.clone(),
430 pending_requests,
431 data_stream_registry,
432 ));
433
434 // Set local_id
435 gate.set_local_id(actor_id.clone()).await;
436
437 tracing::info!(
438 "β
WebRtcGate created with shared pending_requests and DataStreamRegistry"
439 );
440
441 // βββββββββββββββββββββββββββββββββββββββββββ
442 // 1.7. Set outproc_gate in ContextFactory (completing initialization)
443 // βββββββββββββββββββββββββββββββββββββββββββ
444 tracing::info!("π§ Setting outproc_gate in ContextFactory");
445 self.context_factory
446 .as_mut()
447 .expect("ContextFactory must exist")
448 .set_outproc_gate(outproc_gate_enum);
449
450 tracing::info!(
451 "β
ContextFactory fully initialized (inproc + outproc gates ready)"
452 );
453
454 // Save references (WebRtcGate kept for backward compatibility if needed)
455 self.webrtc_coordinator = Some(coordinator.clone());
456 self.webrtc_gate = Some(gate.clone());
457
458 tracing::info!("β
WebRTC infrastructure initialized");
459 }
460 Some(register_response::Result::Error(error)) => {
461 tracing::error!(
462 severity = 10,
463 error_category = "registration_error",
464 error_code = error.code,
465 "β Registration failed: code={}, message={}",
466 error.code,
467 error.message
468 );
469 return Err(actr_protocol::ProtocolError::TransportError(format!(
470 "Registration rejected: {} (code: {})",
471 error.message, error.code
472 )));
473 }
474 None => {
475 tracing::error!(
476 severity = 10,
477 error_category = "registration_error",
478 "β Registration response missing result"
479 );
480 return Err(actr_protocol::ProtocolError::TransportError(
481 "Invalid registration response: missing result".to_string(),
482 ));
483 }
484 }
485
486 // βββββββββββββββββββββββββββββββββββββββββββ
487 // 2. Transport layer initialization (completed via WebRTC infrastructure)
488 // βββββββββββββββββββββββββββββββββββββββββββ
489 tracing::info!("β
Transport layer initialized via WebRTC infrastructure");
490
491 // βββββββββββββββββββββββββββββββββββββββββββ
492 // 3. Collect task handles before converting to Arc
493 // βββββββββββββββββββββββββββββββββββββββββββ
494 let mut task_handles = Vec::new();
495
496 // βββββββββββββββββββββββββββββββββββββββββββ
497 // 3.1 Convert to Arc (before starting background loops)
498 // βββββββββββββββββββββββββββββββββββββββββββ
499 // Clone actor_id before moving self into Arc
500 let actor_id = self
501 .actor_id
502 .as_ref()
503 .ok_or_else(|| {
504 actr_protocol::ProtocolError::InvalidStateTransition(
505 "Actor ID not set - registration must complete before starting node"
506 .to_string(),
507 )
508 })?
509 .clone();
510
511 let actor_id_for_shell = actor_id.clone();
512 let shutdown_token = self.shutdown_token.clone();
513 let node_ref = Arc::new(self);
514
515 // βββββββββββββββββββββββββββββββββββββββββββ
516 // 3.5. Start WebRTC background loops (BEFORE on_start)
517 // βββββββββββββββββββββββββββββββββββββββββββ
518 // CRITICAL: Start signaling loop before on_start() to avoid deadlock
519 // where on_start() tries to send messages but signaling loop isn't running
520 tracing::info!("π Starting WebRTC background loops");
521
522 // Start WebRtcCoordinator signaling loop
523 if let Some(coordinator) = &node_ref.webrtc_coordinator {
524 coordinator.clone().start().await.map_err(|e| {
525 actr_protocol::ProtocolError::TransportError(format!(
526 "WebRtcCoordinator start failed: {e}"
527 ))
528 })?;
529 tracing::info!("β
WebRtcCoordinator signaling loop started");
530 }
531
532 // Start WebRtcGate message receive loop (route to Mailbox)
533 if let Some(gate) = &node_ref.webrtc_gate {
534 gate.start_receive_loop(node_ref.mailbox.clone())
535 .await
536 .map_err(|e| {
537 actr_protocol::ProtocolError::TransportError(format!(
538 "WebRtcGate receive loop start failed: {e}"
539 ))
540 })?;
541 tracing::info!("β
WebRtcGate β Mailbox routing started");
542 }
543
544 tracing::info!("β
WebRTC background loops started");
545
546 // βββββββββββββββββββββββββββββββββββββββββββ
547 // 4. Call lifecycle hook on_start (AFTER WebRTC loops are running)
548 // βββββββββββββββββββββββββββββββββββββββββββ
549 tracing::info!("πͺ Calling lifecycle hook: on_start");
550
551 let ctx = node_ref
552 .context_factory
553 .as_ref()
554 .expect("ContextFactory must be initialized before on_start")
555 .create(
556 &actor_id,
557 None, // caller_id
558 "bootstrap", // trace_id
559 "bootstrap", // request_id
560 );
561 node_ref.workload.on_start(&ctx).await?;
562 tracing::info!("β
Lifecycle hook on_start completed");
563
564 // βββββββββββββββββββββββββββββββββββββββββββ
565 // 4.6. Start Inproc receive loop (Shell β Workload)
566 // βββββββββββββββββββββββββββββββββββββββββββ
567 tracing::info!("π Starting Inproc receive loop (Shell β Workload)");
568 // Start Workload receive loop (Shell β Workload REQUEST)
569 if let Some(shell_to_workload) = &node_ref.inproc_mgr {
570 if let Some(workload_to_shell) = &node_ref.workload_to_shell_mgr {
571 let node = node_ref.clone();
572 let request_rx_lane = shell_to_workload
573 .get_lane(actr_protocol::PayloadType::RpcReliable, None)
574 .await
575 .map_err(|e| {
576 actr_protocol::ProtocolError::TransportError(format!(
577 "Failed to get Workload receive lane: {e}"
578 ))
579 })?;
580 let response_tx = workload_to_shell.clone();
581 let shutdown = shutdown_token.clone();
582
583 let inproc_handle = tokio::spawn(async move {
584 loop {
585 tokio::select! {
586 _ = shutdown.cancelled() => {
587 tracing::info!("π Workload receive loop (Shell β Workload) received shutdown signal");
588 break;
589 }
590
591 envelope_result = request_rx_lane.recv_envelope() => {
592 match envelope_result {
593 Ok(envelope) => {
594 let request_id = envelope.request_id.clone();
595 tracing::debug!("π¨ Workload received REQUEST from Shell: request_id={}", request_id);
596
597 // Shell calls have no caller_id (local process communication)
598 match node.handle_incoming(envelope.clone(), None).await {
599 Ok(response_bytes) => {
600 // Send RESPONSE back via workload_to_shell
601 // Keep same route_key (no prefix needed - separate channels!)
602 let response_envelope = RpcEnvelope {
603 route_key: envelope.route_key.clone(),
604 payload: Some(response_bytes),
605 error: None,
606 trace_id: envelope.trace_id.clone(),
607 request_id: request_id.clone(),
608 metadata: Vec::new(),
609 timeout_ms: 30000,
610 };
611
612 // Send via Workload β Shell channel
613 if let Err(e) = response_tx.send_message(PayloadType::RpcReliable, None, response_envelope).await {
614 tracing::error!(
615 severity = 7,
616 error_category = "transport_error",
617 trace_id = %envelope.trace_id,
618 request_id = %request_id,
619 "β Failed to send RESPONSE to Shell: {:?}", e
620 );
621 }
622 }
623 Err(e) => {
624 tracing::error!(
625 severity = 6,
626 error_category = "handler_error",
627 trace_id = %envelope.trace_id,
628 request_id = %request_id,
629 route_key = %envelope.route_key,
630 "β Workload message handling failed: {:?}", e
631 );
632
633 // Send error response (system-level error on envelope)
634 let error_response = actr_protocol::ErrorResponse {
635 code: protocol_error_to_code(&e),
636 message: e.to_string(),
637 };
638
639 let error_envelope = RpcEnvelope {
640 route_key: envelope.route_key.clone(),
641 payload: None,
642 error: Some(error_response),
643 trace_id: envelope.trace_id.clone(),
644 request_id: request_id.clone(),
645 metadata: Vec::new(),
646 timeout_ms: 30000,
647 };
648
649 if let Err(e) = response_tx.send_message(PayloadType::RpcReliable, None, error_envelope).await {
650 tracing::error!(
651 severity = 7,
652 error_category = "transport_error",
653 trace_id = %envelope.trace_id,
654 request_id = %request_id,
655 "β Failed to send ERROR response to Shell: {:?}", e
656 );
657 }
658 }
659 }
660 }
661 Err(e) => {
662 tracing::error!(
663 severity = 8,
664 error_category = "transport_error",
665 "β Failed to receive from Shell β Workload lane: {:?}", e
666 );
667 break;
668 }
669 }
670 }
671 }
672 }
673 tracing::info!(
674 "β
Workload receive loop (Shell β Workload) terminated gracefully"
675 );
676 });
677
678 task_handles.push(inproc_handle);
679 }
680 }
681 tracing::info!("β
Workload receive loop (Shell β Workload REQUEST) started");
682
683 // βββββββββββββββββββββββββββββββββββββββββββ
684 // 4.7. Start Shell receive loop (Workload β Shell RESPONSE)
685 // βββββββββββββββββββββββββββββββββββββββββββ
686 tracing::info!("π Starting Shell receive loop (Workload β Shell RESPONSE)");
687 if let Some(workload_to_shell) = &node_ref.workload_to_shell_mgr {
688 if let Some(shell_to_workload) = &node_ref.inproc_mgr {
689 let response_rx_lane = workload_to_shell
690 .get_lane(actr_protocol::PayloadType::RpcReliable, None)
691 .await
692 .map_err(|e| {
693 actr_protocol::ProtocolError::TransportError(format!(
694 "Failed to get Shell receive lane: {e}"
695 ))
696 })?;
697 let request_mgr = shell_to_workload.clone();
698 let shutdown = shutdown_token.clone();
699
700 let shell_receive_handle = tokio::spawn(async move {
701 loop {
702 tokio::select! {
703 _ = shutdown.cancelled() => {
704 tracing::info!("π Shell receive loop (Workload β Shell) received shutdown signal");
705 break;
706 }
707
708 envelope_result = response_rx_lane.recv_envelope() => {
709 match envelope_result {
710 Ok(envelope) => {
711 tracing::debug!("π¨ Shell received RESPONSE from Workload: request_id={}", envelope.request_id);
712
713 // Check if response is success or error
714 match (envelope.payload, envelope.error) {
715 (Some(payload), None) => {
716 // Success response
717 if let Err(e) = request_mgr.complete_response(&envelope.request_id, payload).await {
718 tracing::warn!(
719 severity = 4,
720 error_category = "orphan_response",
721 trace_id = %envelope.trace_id,
722 request_id = %envelope.request_id,
723 "β οΈ No pending request found for response: {:?}", e
724 );
725 }
726 }
727 (None, Some(error)) => {
728 // Error response - convert to ProtocolError and complete with error
729 let protocol_err = actr_protocol::ProtocolError::TransportError(
730 format!("RPC error {}: {}", error.code, error.message)
731 );
732 if let Err(e) = request_mgr.complete_error(&envelope.request_id, protocol_err).await {
733 tracing::warn!(
734 severity = 4,
735 error_category = "orphan_response",
736 trace_id = %envelope.trace_id,
737 request_id = %envelope.request_id,
738 "β οΈ No pending request found for error response: {:?}", e
739 );
740 }
741 }
742 _ => {
743 tracing::error!(
744 severity = 7,
745 error_category = "protocol_error",
746 trace_id = %envelope.trace_id,
747 request_id = %envelope.request_id,
748 "β Invalid RpcEnvelope: both payload and error are present or both absent"
749 );
750 }
751 }
752 }
753 Err(e) => {
754 tracing::error!(
755 severity = 8,
756 error_category = "transport_error",
757 "β Failed to receive from Workload β Shell lane: {:?}", e
758 );
759 break;
760 }
761 }
762 }
763 }
764 }
765 tracing::info!(
766 "β
Shell receive loop (Workload β Shell) terminated gracefully"
767 );
768 });
769
770 task_handles.push(shell_receive_handle);
771 }
772 }
773 tracing::info!("β
Shell receive loop (Workload β Shell RESPONSE) started");
774
775 // βββββββββββββββββββββββββββββββββββββββββββ
776 // 5. Start Mailbox processing loop (State Path)
777 // βββββββββββββββββββββββββββββββββββββββββββ
778 tracing::info!("π Starting Mailbox processing loop (State Path)");
779 {
780 let node = node_ref.clone();
781 let mailbox = node_ref.mailbox.clone();
782 let gate = node_ref.webrtc_gate.clone();
783 let shutdown = shutdown_token.clone();
784
785 let mailbox_handle = tokio::spawn(async move {
786 loop {
787 tokio::select! {
788 // Listen for shutdown signal
789 _ = shutdown.cancelled() => {
790 tracing::info!("π Mailbox loop received shutdown signal");
791 break;
792 }
793
794 // Dequeue messages (by priority)
795 result = mailbox.dequeue() => {
796 match result {
797 Ok(messages) => {
798 if messages.is_empty() {
799 // Queue empty, sleep briefly
800 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
801 continue;
802 }
803
804 tracing::debug!("π¬ Mailbox dequeue: {} messages", messages.len());
805
806 // Process messages one by one
807 for msg_record in messages {
808 // Deserialize RpcEnvelope (Protobuf)
809 match RpcEnvelope::decode(&msg_record.payload[..]) {
810 Ok(envelope) => {
811 let request_id = envelope.request_id.clone();
812 tracing::debug!("π¦ Processing message: request_id={}", request_id);
813
814 // Decode caller_id from MessageRecord.from (transport layer)
815 let caller_id_result = ActrId::decode(&msg_record.from[..]);
816 let caller_id_ref = caller_id_result.as_ref().ok();
817
818 if caller_id_ref.is_none() {
819 tracing::warn!(
820 trace_id = %envelope.trace_id,
821 request_id = %request_id,
822 "β οΈ Failed to decode caller_id from MessageRecord.from"
823 );
824 }
825
826 // Call handle_incoming with caller_id from transport layer
827 match node.handle_incoming(envelope.clone(), caller_id_ref).await {
828 Ok(response_bytes) => {
829 // Send response (reuse request_id)
830 if let Some(ref gate) = gate {
831 // Use already decoded caller_id
832 match caller_id_result {
833 Ok(caller) => {
834 // Construct response RpcEnvelope (reuse request_id!)
835 let response_envelope = RpcEnvelope {
836 request_id, // Reuse!
837 route_key: envelope.route_key.clone(),
838 payload: Some(response_bytes),
839 error: None,
840 trace_id: envelope.trace_id.clone(),
841 metadata: Vec::new(), // Response doesn't need extra metadata
842 timeout_ms: 30000,
843 };
844
845 if let Err(e) = gate.send_response(&caller, response_envelope).await {
846 tracing::error!(
847 severity = 7,
848 error_category = "transport_error",
849 trace_id = %envelope.trace_id,
850 request_id = %envelope.request_id,
851 "β Failed to send response: {:?}", e
852 );
853 }
854 }
855 Err(e) => {
856 tracing::error!(
857 severity = 8,
858 error_category = "protobuf_decode",
859 trace_id = %envelope.trace_id,
860 request_id = %envelope.request_id,
861 "β Failed to decode caller_id: {:?}", e
862 );
863 }
864 }
865 }
866
867 // ACK message
868 if let Err(e) = mailbox.ack(msg_record.id).await {
869 tracing::error!(
870 severity = 9,
871 error_category = "mailbox_error",
872 trace_id = %envelope.trace_id,
873 request_id = %envelope.request_id,
874 message_id = %msg_record.id,
875 "β Mailbox ACK failed: {:?}", e
876 );
877 }
878 }
879 Err(e) => {
880 tracing::error!(
881 severity = 6,
882 error_category = "handler_error",
883 trace_id = %envelope.trace_id,
884 request_id = %envelope.request_id,
885 route_key = %envelope.route_key,
886 "β handle_incoming failed: {:?}", e
887 );
888 // ACK to avoid infinite retries
889 // Application errors are caller's responsibility
890 let _ = mailbox.ack(msg_record.id).await;
891 }
892 }
893 }
894 Err(e) => {
895 // Poison message - cannot decode RpcEnvelope
896 tracing::error!(
897 severity = 9,
898 error_category = "protobuf_decode",
899 message_id = %msg_record.id,
900 "β Poison message: Failed to deserialize RpcEnvelope: {:?}", e
901 );
902
903 // Write to Dead Letter Queue
904 use actr_mailbox::DlqRecord;
905 use chrono::Utc;
906 use uuid::Uuid;
907
908 let dlq_record = DlqRecord {
909 id: Uuid::new_v4(),
910 original_message_id: Some(msg_record.id.to_string()),
911 from: Some(msg_record.from.clone()),
912 to: node.actor_id.as_ref().map(|id| {
913 let mut buf = Vec::new();
914 id.encode(&mut buf).unwrap();
915 buf
916 }),
917 raw_bytes: msg_record.payload.clone(),
918 error_message: format!("Protobuf decode failed: {e}"),
919 error_category: "protobuf_decode".to_string(),
920 trace_id: format!("mailbox-{}", msg_record.id), // Fallback trace_id
921 request_id: None,
922 created_at: Utc::now(),
923 redrive_attempts: 0,
924 last_redrive_at: None,
925 context: Some(format!(
926 r#"{{"source":"mailbox","priority":"{}"}}"#,
927 match msg_record.priority {
928 actr_mailbox::MessagePriority::High => "high",
929 actr_mailbox::MessagePriority::Normal => "normal",
930 }
931 )),
932 };
933
934 if let Err(dlq_err) = node.dlq.enqueue(dlq_record).await {
935 tracing::error!(
936 severity = 10,
937 "β CRITICAL: Failed to write poison message to DLQ: {:?}", dlq_err
938 );
939 } else {
940 tracing::warn!(
941 severity = 9,
942 "β οΈ Poison message moved to DLQ: message_id={}", msg_record.id
943 );
944 }
945
946 // ACK the poison message to remove from mailbox
947 let _ = mailbox.ack(msg_record.id).await;
948 }
949 }
950 }
951 }
952 Err(e) => {
953 tracing::error!(
954 severity = 9,
955 error_category = "mailbox_error",
956 "β Mailbox dequeue failed: {:?}", e
957 );
958 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
959 }
960 }
961 }
962 }
963 }
964 tracing::info!("β
Mailbox processing loop terminated gracefully");
965 });
966
967 task_handles.push(mailbox_handle);
968 }
969 tracing::info!("β
Mailbox processing loop started");
970
971 tracing::info!("β
ActrNode started successfully");
972
973 // βββββββββββββββββββββββββββββββββββββββββββ
974 // 6. Create ActrRef for Shell to interact with Workload
975 // βββββββββββββββββββββββββββββββββββββββββββ
976 use crate::actr_ref::{ActrRef, ActrRefShared};
977 use crate::outbound::InprocOutGate;
978
979 // Create InprocOutGate from shell_to_workload transport manager
980 let shell_to_workload = node_ref
981 .inproc_mgr
982 .clone()
983 .expect("inproc_mgr must be initialized");
984 let inproc_gate = Arc::new(InprocOutGate::new(shell_to_workload));
985
986 // Create ActrRefShared
987 let actr_ref_shared = Arc::new(ActrRefShared {
988 actor_id: actor_id_for_shell.clone(),
989 inproc_gate,
990 shutdown_token: shutdown_token.clone(),
991 task_handles,
992 });
993
994 // Create ActrRef
995 let actr_ref = ActrRef::new(actr_ref_shared);
996
997 tracing::info!("β
ActrRef created (Shell β Workload communication handle)");
998
999 Ok(actr_ref)
1000 }
1001}