1use crate::actr_ref::{ActrRef, ActrRefShared};
9use crate::ais_client::AisClient;
10use crate::context::{BootstrapContextBuilder, RuntimeContext};
11use crate::inbound::{DataStreamRegistry, MediaFrameRegistry};
12use crate::lifecycle::dedup::{DedupOutcome, DedupState};
13use crate::outbound::Gate;
14use crate::transport::HostTransport;
15use crate::wire::webrtc::SignalingClient;
16#[cfg(feature = "opentelemetry")]
17use crate::wire::webrtc::trace::{inject_span_context_to_rpc, set_parent_from_rpc_envelope};
18use actr_framework::Bytes;
19use actr_protocol::prost::Message as ProstMessage;
20use actr_protocol::{
21 AIdCredential, ActorResult, ActrError, ActrId, PayloadType, RegisterAuthMode, RegisterRequest,
22 RpcEnvelope, TurnCredential, register_response,
23};
24use actr_runtime::check_acl_permission;
25use actr_runtime_mailbox::{DeadLetterQueue, Mailbox};
26use std::sync::Arc;
27use std::time::Duration;
28use tokio::sync::{Mutex, RwLock};
29use tokio_util::sync::CancellationToken;
30#[cfg(feature = "opentelemetry")]
31use tracing::Instrument as _;
32
33pub(crate) struct Inner {
40 pub(crate) config: actr_config::RuntimeConfig,
42
43 pub(crate) mailbox: Arc<dyn Mailbox>,
45
46 pub(crate) dlq: Arc<dyn DeadLetterQueue>,
48
49 pub(crate) inproc_gate: Gate,
54
55 pub(crate) outproc_gate: Option<Gate>,
61
62 pub(crate) data_stream_registry: Arc<DataStreamRegistry>,
66
67 pub(crate) media_frame_registry: Arc<MediaFrameRegistry>,
70
71 pub(crate) signaling_client: Arc<dyn SignalingClient>,
73
74 pub(crate) actor_id: Option<ActrId>,
76
77 pub(crate) credential_state: Option<CredentialState>,
79
80 pub(crate) webrtc_coordinator: Option<Arc<crate::wire::webrtc::WebRtcCoordinator>>,
82
83 pub(crate) webrtc_gate: Option<Arc<crate::wire::webrtc::gate::WebRtcGate>>,
85
86 pub(crate) websocket_gate: Option<Arc<crate::wire::websocket::WebSocketGate>>,
88
89 pub(crate) shell_to_workload: Option<Arc<HostTransport>>,
93
94 pub(crate) workload_to_shell: Option<Arc<HostTransport>>,
98
99 pub(crate) shutdown_token: CancellationToken,
101
102 pub(crate) actr_lock: Option<Arc<actr_config::lock::LockFile>>,
107 pub(crate) network_event_rx:
109 Option<tokio::sync::mpsc::Receiver<crate::lifecycle::network_event::NetworkEvent>>,
110
111 pub(crate) network_event_result_tx:
113 Option<tokio::sync::mpsc::Sender<crate::lifecycle::network_event::NetworkEventResult>>,
114
115 pub(crate) network_event_debounce_config:
117 Option<crate::lifecycle::network_event::DebounceConfig>,
118
119 pub(crate) dedup_state: Arc<Mutex<DedupState>>,
121
122 #[allow(dead_code)]
124 pub(crate) package_manifest: Option<actr_pack::PackageManifest>,
125
126 pub(crate) preregistered_credential: Option<actr_protocol::register_response::RegisterOk>,
130
131 pub(crate) discovered_ws_addresses:
137 Arc<tokio::sync::RwLock<std::collections::HashMap<ActrId, String>>>,
138
139 pub(crate) workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
152
153 #[allow(dead_code)]
161 pub(crate) hook_observer: Option<crate::lifecycle::hooks::WorkloadHookObserverRef>,
162
163 pub(crate) mailbox_backpressure_threshold: usize,
169
170 #[allow(dead_code)]
174 pub(crate) credential_expiry_warning: Duration,
175}
176
177#[derive(Clone)]
179pub struct CredentialState {
180 inner: Arc<RwLock<CredentialStateInner>>,
181}
182
183#[derive(Clone)]
184struct CredentialStateInner {
185 credential: AIdCredential,
186 expires_at: Option<prost_types::Timestamp>,
187 turn_credential: Option<TurnCredential>,
189}
190
191impl CredentialState {
192 pub fn new(
194 credential: AIdCredential,
195 expires_at: Option<prost_types::Timestamp>,
196 turn_credential: Option<TurnCredential>,
197 ) -> Self {
198 Self {
199 inner: Arc::new(RwLock::new(CredentialStateInner {
200 credential,
201 expires_at,
202 turn_credential,
203 })),
204 }
205 }
206
207 pub async fn credential(&self) -> AIdCredential {
208 self.inner.read().await.credential.clone()
209 }
210
211 pub async fn expires_at(&self) -> Option<prost_types::Timestamp> {
212 self.inner.read().await.expires_at
213 }
214
215 pub async fn turn_credential(&self) -> Option<TurnCredential> {
217 self.inner.read().await.turn_credential.clone()
218 }
219
220 pub(crate) async fn update(
224 &self,
225 credential: AIdCredential,
226 expires_at: Option<prost_types::Timestamp>,
227 turn_credential: Option<TurnCredential>,
228 ) {
229 let mut guard = self.inner.write().await;
230 guard.credential = credential;
231 guard.expires_at = expires_at;
232 if turn_credential.is_some() {
233 guard.turn_credential = turn_credential;
234 }
235 }
236}
237
238async fn host_operation_handler(
242 ctx: crate::context::RuntimeContext,
243 workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
244 pending: crate::workload::HostOperation,
245) -> crate::workload::HostOperationResult {
246 use crate::workload::{HostOperation, HostOperationResult, decode_dest};
247 use actr_framework::guest::dynclib_abi::code as abi_code;
248 use actr_framework::{Context as _, Dest};
249 use actr_protocol::{DataStream, PayloadType};
250
251 fn actr_error_to_code(err: &ActrError) -> i32 {
253 match err {
254 ActrError::DecodeFailure(_) | ActrError::InvalidArgument(_) => abi_code::PROTOCOL_ERROR,
255 _ => abi_code::GENERIC_ERROR,
256 }
257 }
258
259 match pending {
260 HostOperation::CallRaw(req) => {
261 match ctx
262 .call_raw(
263 &Dest::Actor(req.target),
264 req.route_key,
265 PayloadType::RpcReliable,
266 bytes::Bytes::from(req.payload),
267 30_000,
268 )
269 .await
270 {
271 Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
272 Err(e) => {
273 tracing::error!("call_raw routing failed: {e:?}");
274 HostOperationResult::Error(actr_error_to_code(&e))
275 }
276 }
277 }
278
279 HostOperation::Call(req) => {
280 let dest = match decode_dest(&req.dest) {
281 Some(d) => d,
282 None => {
283 tracing::error!(route_key = req.route_key, "call: dest decode failed");
284 return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
285 }
286 };
287 match ctx
288 .call_raw(
289 &dest,
290 req.route_key,
291 PayloadType::RpcReliable,
292 bytes::Bytes::from(req.payload),
293 30_000,
294 )
295 .await
296 {
297 Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
298 Err(e) => {
299 tracing::error!("call routing failed: {e:?}");
300 HostOperationResult::Error(actr_error_to_code(&e))
301 }
302 }
303 }
304
305 HostOperation::Tell(req) => {
306 let dest = match decode_dest(&req.dest) {
307 Some(d) => d,
308 None => {
309 tracing::error!(route_key = req.route_key, "tell: dest decode failed");
310 return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
311 }
312 };
313 match ctx
314 .tell_raw(
315 &dest,
316 req.route_key,
317 PayloadType::RpcReliable,
318 bytes::Bytes::from(req.payload),
319 )
320 .await
321 {
322 Ok(()) => HostOperationResult::Done,
323 Err(e) => {
324 tracing::error!("tell routing failed: {e:?}");
325 HostOperationResult::Error(actr_error_to_code(&e))
326 }
327 }
328 }
329
330 HostOperation::Discover(req) => {
331 match ctx.discover_route_candidate(&req.target_type).await {
332 Ok(id) => HostOperationResult::Bytes(id.encode_to_vec()),
333 Err(e) => {
334 tracing::error!("discover failed: {e:?}");
335 HostOperationResult::Error(actr_error_to_code(&e))
336 }
337 }
338 }
339
340 HostOperation::RegisterStream(req) => {
341 let stream_id = req.stream_id;
342 let callback_ctx = ctx.clone();
343 let callback_workload_dispatch = workload_dispatch.clone();
344 match ctx
345 .register_stream(stream_id, move |chunk: DataStream, sender| {
346 let ctx_for_executor = callback_ctx.clone();
347 let workload_dispatch = callback_workload_dispatch.clone();
348 Box::pin(async move {
349 let invocation = crate::workload::InvocationContext {
350 self_id: actr_framework::Context::self_id(&ctx_for_executor).clone(),
351 caller_id: Some(sender.clone()),
352 request_id: format!(
353 "data-stream:{}:{}",
354 chunk.stream_id, chunk.sequence
355 ),
356 };
357 let call_executor: crate::workload::HostAbiFn =
358 std::sync::Arc::new(move |pending| {
359 let ctx = ctx_for_executor.clone();
360 Box::pin(async move {
361 stream_callback_host_operation_handler(ctx, pending).await
362 })
363 });
364 let mut guard = workload_dispatch.lock().await;
365 guard
366 .dispatch_data_stream(chunk, sender, invocation, &call_executor)
367 .await
368 })
369 })
370 .await
371 {
372 Ok(()) => HostOperationResult::Done,
373 Err(e) => {
374 tracing::error!("register_stream failed: {e:?}");
375 HostOperationResult::Error(actr_error_to_code(&e))
376 }
377 }
378 }
379
380 HostOperation::UnregisterStream(req) => match ctx.unregister_stream(&req.stream_id).await {
381 Ok(()) => HostOperationResult::Done,
382 Err(e) => {
383 tracing::error!("unregister_stream failed: {e:?}");
384 HostOperationResult::Error(actr_error_to_code(&e))
385 }
386 },
387
388 HostOperation::SendDataStream(req) => {
389 let dest = match decode_dest(&req.dest) {
390 Some(d) => d,
391 None => {
392 tracing::error!("send_data_stream: dest decode failed");
393 return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
394 }
395 };
396 let payload_type = match PayloadType::try_from(req.payload_type) {
397 Ok(PayloadType::StreamReliable | PayloadType::StreamLatencyFirst) => {
398 PayloadType::try_from(req.payload_type).expect("checked payload type")
399 }
400 Ok(other) => {
401 tracing::error!(?other, "send_data_stream: invalid stream payload type");
402 return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
403 }
404 Err(_) => {
405 tracing::error!(
406 payload_type = req.payload_type,
407 "send_data_stream: unknown payload type"
408 );
409 return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
410 }
411 };
412 match ctx.send_data_stream(&dest, req.chunk, payload_type).await {
413 Ok(()) => HostOperationResult::Done,
414 Err(e) => {
415 tracing::error!("send_data_stream failed: {e:?}");
416 HostOperationResult::Error(actr_error_to_code(&e))
417 }
418 }
419 }
420 }
421}
422
423fn lifecycle_invocation(
424 actor_id: &ActrId,
425 request_id: &'static str,
426) -> crate::workload::InvocationContext {
427 crate::workload::InvocationContext {
428 self_id: actor_id.clone(),
429 caller_id: None,
430 request_id: request_id.to_string(),
431 }
432}
433
434pub(crate) fn lifecycle_host_abi(
435 ctx: crate::context::RuntimeContext,
436 workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
437) -> crate::workload::HostAbiFn {
438 std::sync::Arc::new(move |pending| {
439 let ctx = ctx.clone();
440 let workload_dispatch = workload_dispatch.clone();
441 Box::pin(async move { host_operation_handler(ctx, workload_dispatch, pending).await })
442 })
443}
444
445async fn stream_callback_host_operation_handler(
446 ctx: crate::context::RuntimeContext,
447 pending: crate::workload::HostOperation,
448) -> crate::workload::HostOperationResult {
449 use crate::workload::{HostOperation, HostOperationResult, decode_dest};
450 use actr_framework::guest::dynclib_abi::code as abi_code;
451 use actr_framework::{Context as _, Dest};
452 use actr_protocol::PayloadType;
453
454 fn actr_error_to_code(err: &ActrError) -> i32 {
455 match err {
456 ActrError::DecodeFailure(_) | ActrError::InvalidArgument(_) => abi_code::PROTOCOL_ERROR,
457 _ => abi_code::GENERIC_ERROR,
458 }
459 }
460
461 match pending {
462 HostOperation::CallRaw(req) => {
463 match ctx
464 .call_raw(
465 &Dest::Actor(req.target),
466 req.route_key,
467 PayloadType::RpcReliable,
468 bytes::Bytes::from(req.payload),
469 30_000,
470 )
471 .await
472 {
473 Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
474 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
475 }
476 }
477 HostOperation::Call(req) => {
478 let dest = match decode_dest(&req.dest) {
479 Some(d) => d,
480 None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
481 };
482 match ctx
483 .call_raw(
484 &dest,
485 req.route_key,
486 PayloadType::RpcReliable,
487 bytes::Bytes::from(req.payload),
488 30_000,
489 )
490 .await
491 {
492 Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
493 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
494 }
495 }
496 HostOperation::Tell(req) => {
497 let dest = match decode_dest(&req.dest) {
498 Some(d) => d,
499 None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
500 };
501 match ctx
502 .tell_raw(
503 &dest,
504 req.route_key,
505 PayloadType::RpcReliable,
506 bytes::Bytes::from(req.payload),
507 )
508 .await
509 {
510 Ok(()) => HostOperationResult::Done,
511 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
512 }
513 }
514 HostOperation::Discover(req) => {
515 match ctx.discover_route_candidate(&req.target_type).await {
516 Ok(id) => HostOperationResult::Bytes(id.encode_to_vec()),
517 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
518 }
519 }
520 HostOperation::RegisterStream(_) => {
521 tracing::error!("register_stream from inside a stream callback is not supported");
522 HostOperationResult::Error(abi_code::UNSUPPORTED_OP)
523 }
524 HostOperation::UnregisterStream(req) => match ctx.unregister_stream(&req.stream_id).await {
525 Ok(()) => HostOperationResult::Done,
526 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
527 },
528 HostOperation::SendDataStream(req) => {
529 let dest = match decode_dest(&req.dest) {
530 Some(d) => d,
531 None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
532 };
533 let payload_type = match PayloadType::try_from(req.payload_type) {
534 Ok(PayloadType::StreamReliable | PayloadType::StreamLatencyFirst) => {
535 PayloadType::try_from(req.payload_type).expect("checked payload type")
536 }
537 Ok(_) | Err(_) => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
538 };
539 match ctx.send_data_stream(&dest, req.chunk, payload_type).await {
540 Ok(()) => HostOperationResult::Done,
541 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
542 }
543 }
544 }
545}
546
547fn protocol_error_to_code(err: &ActrError) -> u32 {
549 match err {
550 ActrError::Unavailable(_) => 503, ActrError::TimedOut => 504, ActrError::NotFound(_) => 404, ActrError::PermissionDenied(_) => 403, ActrError::InvalidArgument(_) => 400, ActrError::UnknownRoute(_) => 404, ActrError::DependencyNotFound { .. } => 400, ActrError::DecodeFailure(_) => 400, ActrError::NotImplemented(_) => 501, ActrError::Internal(_) => 500, }
561}
562
563impl Inner {
564 #[allow(dead_code)]
565 pub(crate) fn package_manifest(&self) -> Option<&actr_pack::PackageManifest> {
566 self.package_manifest.as_ref()
567 }
568
569 async fn network_event_loop(
576 event_rx: tokio::sync::mpsc::Receiver<crate::lifecycle::network_event::NetworkEvent>,
577 result_tx: tokio::sync::mpsc::Sender<crate::lifecycle::network_event::NetworkEventResult>,
578 event_processor: Arc<dyn crate::lifecycle::network_event::NetworkEventProcessor>,
579 shutdown_token: CancellationToken,
580 ) {
581 crate::lifecycle::network_event::run_network_event_reconciler(
582 event_rx,
583 result_tx,
584 event_processor,
585 shutdown_token,
586 )
587 .await;
588 }
589
590 #[cfg_attr(
593 feature = "opentelemetry",
594 tracing::instrument(
595 skip_all,
596 name = "ActrNode.handle_incoming",
597 fields(
598 actr_id = %self.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default(),
599 route_key = %envelope.route_key,
600 request_id = %envelope.request_id,
601 )
602 )
603 )]
604 pub async fn handle_incoming(
605 &self,
606 envelope: RpcEnvelope,
607 caller_id: Option<&ActrId>,
608 ) -> ActorResult<Bytes> {
609 if let Some(caller) = caller_id {
611 tracing::debug!(
612 "📨 Handling incoming message: route_key={}, caller={}, request_id={}",
613 envelope.route_key,
614 caller,
615 envelope.request_id
616 );
617 } else {
618 tracing::debug!(
619 "📨 Handling incoming message: route_key={}, request_id={}",
620 envelope.route_key,
621 envelope.request_id
622 );
623 }
624
625 let actor_id = self.actor_id.as_ref().ok_or_else(|| {
627 ActrError::Internal(
628 "Actor ID not set - node must be started before handling messages".to_string(),
629 )
630 })?;
631
632 let acl_allowed = check_acl_permission(caller_id, actor_id, self.config.acl.as_ref())
634 .map_err(|err_msg| ActrError::Internal(format!("ACL check failed: {}", err_msg)))?;
635
636 if !acl_allowed {
637 tracing::warn!(
638 severity = 5,
639 error_category = "acl_denied",
640 request_id = %envelope.request_id,
641 route_key = %envelope.route_key,
642 caller = %caller_id
643 .map(|c| c.to_string())
644 .unwrap_or_else(|| "<none>".to_string()),
645 "🚫 ACL: Permission denied"
646 );
647
648 return Err(ActrError::PermissionDenied(format!(
649 "ACL denied: {} is not allowed to call {}",
650 caller_id
651 .map(|c| c.to_string())
652 .unwrap_or_else(|| "<unknown>".to_string()),
653 actor_id
654 )));
655 }
656
657 {
659 let outcome = self
660 .dedup_state
661 .lock()
662 .await
663 .check_or_mark(&envelope.request_id);
664 match outcome {
665 DedupOutcome::Fresh => {} DedupOutcome::InFlight => {
667 tracing::warn!(
668 request_id = %envelope.request_id,
669 route_key = %envelope.route_key,
670 "⚠️ duplicate request in-flight, dropping concurrent copy"
671 );
672 return Err(ActrError::InvalidArgument(
673 "duplicate request already in-flight".to_string(),
674 ));
675 }
676 DedupOutcome::Duplicate(cached) => {
677 tracing::debug!(
678 request_id = %envelope.request_id,
679 route_key = %envelope.route_key,
680 "♻️ returning cached response for duplicate request_id"
681 );
682 return cached;
683 }
684 }
685 }
686
687 let credential_state = self.credential_state.clone().ok_or_else(|| {
689 ActrError::Internal(
690 "Credential not set - node must be started before handling messages".to_string(),
691 )
692 })?;
693 let ctx = self.make_runtime_context(
694 actor_id,
695 caller_id, &envelope.request_id,
697 &credential_state.credential().await,
698 );
699
700 let dispatch_ctx = crate::workload::InvocationContext {
702 self_id: actor_id.clone(),
703 caller_id: caller_id.cloned(),
704 request_id: envelope.request_id.clone(),
705 };
706 let ctx_for_executor = ctx.clone();
707 let workload_for_executor = self.workload_dispatch.clone();
708 let call_executor: crate::workload::HostAbiFn = std::sync::Arc::new(move |pending| {
709 let ctx = ctx_for_executor.clone();
710 let workload_dispatch = workload_for_executor.clone();
711 Box::pin(async move { host_operation_handler(ctx, workload_dispatch, pending).await })
712 });
713
714 let mut guard = self.workload_dispatch.lock().await;
715 let result = guard
716 .dispatch_envelope(envelope.clone(), ctx.clone(), dispatch_ctx, &call_executor)
717 .await
718 .map_err(|e| ActrError::Internal(format!("workload dispatch failed: {e:?}")));
719
720 match &result {
721 Ok(_) => tracing::debug!(
722 request_id = %envelope.request_id,
723 route_key = %envelope.route_key,
724 "✅ Message handled successfully"
725 ),
726 Err(e) => tracing::error!(
727 severity = 6,
728 error_category = "handler_error",
729 request_id = %envelope.request_id,
730 route_key = %envelope.route_key,
731 "❌ Message handling failed: {:?}", e
732 ),
733 }
734
735 self.dedup_state
737 .lock()
738 .await
739 .complete(&envelope.request_id, result.clone());
740
741 result
742 }
743
744 pub(crate) async fn build(
749 config: actr_config::RuntimeConfig,
750 workload: crate::workload::Workload,
751 package_manifest: Option<actr_pack::PackageManifest>,
752 packaged_lock: Option<actr_config::lock::LockFile>,
753 mailbox_backpressure_threshold: usize,
754 credential_expiry_warning: Duration,
755 ) -> ActorResult<Self> {
756 use crate::outbound::{Gate, HostGate};
757 use crate::wire::webrtc::{ReconnectConfig, SignalingConfig, WebSocketSignalingClient};
758
759 tracing::info!("🚀 Initializing ActrNode");
760
761 let mailbox_path = config
763 .mailbox_path
764 .as_ref()
765 .map(|p| p.to_string_lossy().to_string())
766 .unwrap_or_else(|| ":memory:".to_string());
767
768 tracing::info!("📂 Mailbox database path: {}", mailbox_path);
769
770 let mailbox: Arc<dyn actr_runtime_mailbox::Mailbox> = Arc::new(
771 actr_runtime_mailbox::SqliteMailbox::new(&mailbox_path)
772 .await
773 .map_err(|e| {
774 actr_protocol::ActrError::Unavailable(format!("Mailbox init failed: {e}"))
775 })?,
776 );
777
778 let dlq_path = if mailbox_path == ":memory:" {
780 ":memory:".to_string()
781 } else {
782 format!("{mailbox_path}.dlq")
783 };
784
785 let dlq: Arc<dyn actr_runtime_mailbox::DeadLetterQueue> = Arc::new(
786 actr_runtime_mailbox::SqliteDeadLetterQueue::new_standalone(&dlq_path)
787 .await
788 .map_err(|e| {
789 actr_protocol::ActrError::Unavailable(format!("DLQ init failed: {e}"))
790 })?,
791 );
792 tracing::info!("✅ Dead Letter Queue initialized");
793
794 let webrtc_role = if config.webrtc.advanced.prefer_answerer() {
796 Some("answer".to_string())
797 } else {
798 None
799 };
800
801 let signaling_config = SignalingConfig {
802 server_url: config.signaling_url.clone(),
803 connection_timeout: 30,
804 heartbeat_interval: 30,
805 reconnect_config: ReconnectConfig::default(),
806 auth_config: None,
807 webrtc_role,
808 };
809
810 let client = Arc::new(WebSocketSignalingClient::new(signaling_config));
811 client.start_reconnect_manager();
812 let signaling_client: Arc<dyn crate::wire::webrtc::SignalingClient> = client;
813
814 let shell_to_workload = Arc::new(HostTransport::new());
816 let workload_to_shell = Arc::new(HostTransport::new());
817 let inproc_gate = Gate::Host(Arc::new(HostGate::new(shell_to_workload.clone())));
818
819 let data_stream_registry = Arc::new(DataStreamRegistry::new());
820 let media_frame_registry = Arc::new(MediaFrameRegistry::new());
821
822 tracing::info!("✅ Inproc infrastructure initialized (bidirectional Shell ↔ Guest)");
823
824 let actr_lock = if let Some(lock) = packaged_lock {
825 tracing::info!(
826 "📋 Loaded packaged manifest.lock.toml with {} dependencies",
827 lock.dependencies.len()
828 );
829 Some(Arc::new(lock))
830 } else {
831 tracing::warn!(
832 "⚠️ manifest.lock.toml not found in package. Continuing without dependency fingerprints."
833 );
834 None
835 };
836
837 tracing::info!("✅ ActrNode initialized");
838
839 Ok(Self {
840 config,
841 mailbox,
842 dlq,
843 inproc_gate,
844 outproc_gate: None, data_stream_registry,
846 media_frame_registry,
847 signaling_client,
848 actor_id: None,
849 credential_state: None,
850 webrtc_coordinator: None,
851 webrtc_gate: None,
852 websocket_gate: None,
853 shell_to_workload: Some(shell_to_workload),
854 workload_to_shell: Some(workload_to_shell),
855 shutdown_token: CancellationToken::new(),
856 actr_lock,
857 network_event_rx: None,
858 network_event_result_tx: None,
859 network_event_debounce_config: None,
860 dedup_state: Arc::new(Mutex::new(DedupState::new())),
861 package_manifest,
862 preregistered_credential: None,
863 discovered_ws_addresses: Arc::new(tokio::sync::RwLock::new(
864 std::collections::HashMap::new(),
865 )),
866 workload_dispatch: Arc::new(Mutex::new(workload)),
867 hook_observer: None,
868 mailbox_backpressure_threshold,
869 credential_expiry_warning,
870 })
871 }
872
873 pub(crate) fn bootstrap_ctx_builder(&self) -> BootstrapContextBuilder {
881 BootstrapContextBuilder::new(
882 self.inproc_gate.clone(),
883 self.outproc_gate.clone(),
884 self.data_stream_registry.clone(),
885 self.media_frame_registry.clone(),
886 self.signaling_client.clone(),
887 self.actr_lock.clone(),
888 )
889 }
890
891 pub(crate) fn make_runtime_context(
896 &self,
897 self_id: &ActrId,
898 caller_id: Option<&ActrId>,
899 request_id: &str,
900 credential: &AIdCredential,
901 ) -> RuntimeContext {
902 RuntimeContext::new(
903 self_id.clone(),
904 caller_id.cloned(),
905 request_id.to_string(),
906 self.inproc_gate.clone(),
907 self.outproc_gate.clone(),
908 self.data_stream_registry.clone(),
909 self.media_frame_registry.clone(),
910 self.signaling_client.clone(),
911 credential.clone(),
912 self.actr_lock.clone(),
913 )
914 }
915
916 pub fn create_network_event_handle(
924 &mut self,
925 debounce_ms: u64,
926 ) -> crate::lifecycle::NetworkEventHandle {
927 if self.network_event_rx.is_some() {
928 panic!("create_network_event_handle() can only be called once");
929 }
930
931 let (event_tx, event_rx) = tokio::sync::mpsc::channel(100);
932 let (result_tx, result_rx) = tokio::sync::mpsc::channel(100);
933
934 let debounce_config = if debounce_ms > 0 {
935 Some(crate::lifecycle::network_event::DebounceConfig {
936 window: std::time::Duration::from_millis(debounce_ms),
937 })
938 } else {
939 None
940 };
941
942 self.network_event_rx = Some(event_rx);
943 self.network_event_result_tx = Some(result_tx);
944 self.network_event_debounce_config = debounce_config;
945
946 crate::lifecycle::NetworkEventHandle::new(event_tx, result_rx)
947 }
948
949 pub fn set_preregistered_credential(&mut self, register_ok: register_response::RegisterOk) {
954 tracing::debug!("Pre-registered credential attached; start() will skip AIS registration");
955 self.preregistered_credential = Some(register_ok);
956 }
957
958 pub async fn start(mut self) -> ActorResult<ActrRef> {
960 tracing::info!("🚀 Starting ActrNode");
961 tracing::info!("Actr Rust version: {}", env!("CARGO_PKG_VERSION"));
962
963 let actr_type = self.config.actr_type().clone();
968 tracing::info!("📋 Actor type: {}", actr_type);
969
970 let service_spec = None;
976
977 let ws_address = if let Some(port) = self.config.websocket_listen_port {
980 let host = self
981 .config
982 .websocket_advertised_host
983 .as_deref()
984 .unwrap_or("127.0.0.1");
985 Some(format!("ws://{}:{}", host, port))
986 } else {
987 None
988 };
989
990 if let Some(ref addr) = ws_address {
991 tracing::info!(
992 "📡 Advertising WebSocket address to signaling server: {}",
993 addr
994 );
995 }
996
997 let register_request = RegisterRequest {
998 actr_type: actr_type.clone(),
999 realm: self.config.realm,
1000 service_spec,
1001 acl: self.config.acl.clone(),
1002 service: None,
1003 ws_address,
1004 auth_mode: Some(RegisterAuthMode::Linked as i32),
1005 ..Default::default()
1006 };
1007
1008 let register_ok = if let Some(injected) = self.preregistered_credential.take() {
1012 tracing::info!(
1013 "Using Hyper pre-injected registration credential; skipping AIS registration"
1014 );
1015 injected
1016 } else {
1017 let ais_endpoint = &self.config.ais_endpoint;
1018 tracing::info!(
1019 ais_endpoint = %ais_endpoint,
1020 "Registering actor with AIS via HTTP"
1021 );
1022 let mut ais = AisClient::new(ais_endpoint);
1023 if let Some(ref secret) = self.config.realm_secret {
1024 ais = ais.with_realm_secret(secret);
1025 }
1026 let resp = ais
1027 .register_linked(register_request.clone())
1028 .await
1029 .map_err(|e| ActrError::Unavailable(format!("AIS registration failed: {e}")))?;
1030 match resp.result {
1031 Some(register_response::Result::Success(ok)) => {
1032 tracing::info!("✅ AIS HTTP registration successful");
1033 ok
1034 }
1035 Some(register_response::Result::Error(error)) => {
1036 tracing::error!(
1037 severity = 10,
1038 error_category = "registration_error",
1039 error_code = error.code,
1040 "❌ AIS registration failed: code={}, message={}",
1041 error.code,
1042 error.message
1043 );
1044 return Err(ActrError::Unavailable(format!(
1045 "AIS registration rejected: {} (code: {})",
1046 error.message, error.code
1047 )));
1048 }
1049 None => {
1050 tracing::error!(
1051 severity = 10,
1052 error_category = "registration_error",
1053 "❌ AIS registration response missing result"
1054 );
1055 return Err(ActrError::Unavailable(
1056 "Invalid AIS registration response: missing result".to_string(),
1057 ));
1058 }
1059 }
1060 };
1061
1062 let pre_connect_credential_state = {
1069 let actor_id = register_ok.actr_id.clone();
1070 let credential_state = CredentialState::new(
1071 register_ok.credential.clone(),
1072 register_ok.credential_expires_at,
1073 Some(register_ok.turn_credential.clone()),
1074 );
1075 self.signaling_client.set_actor_id(actor_id).await;
1076 self.signaling_client
1077 .set_credential_state(credential_state.clone())
1078 .await;
1079 credential_state
1080 };
1081
1082 {
1088 let actor_id = register_ok.actr_id.clone();
1089 let credential_state = pre_connect_credential_state.clone();
1090 let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1094 let ctx_builder: crate::lifecycle::hooks::HookContextBuilder = Arc::new(move || {
1095 let snapshot = ctx_builder_snapshot.clone();
1096 let actor_id = actor_id.clone();
1097 let credential_state = credential_state.clone();
1098 Box::pin(async move {
1099 Some(snapshot.build_bootstrap(&actor_id, &credential_state.credential().await))
1100 })
1101 });
1102 let cb = crate::lifecycle::hooks::build_hook_callback(
1103 self.hook_observer.clone(),
1104 ctx_builder,
1105 );
1106 self.signaling_client.set_hook_callback(cb);
1107 }
1108
1109 tracing::info!("📡 Connecting to signaling server (with credential)");
1110 self.signaling_client
1111 .connect()
1112 .await
1113 .map_err(|e| ActrError::Unavailable(format!("Signaling connect failed: {e}")))?;
1114 tracing::info!("✅ Connected to signaling server");
1115
1116 let mut task_handles = Vec::new();
1118
1119 let node_hook_callback: Option<crate::wire::webrtc::HookCallback>;
1123
1124 {
1125 let actor_id = register_ok.actr_id;
1126 let credential = register_ok.credential;
1127
1128 tracing::info!("🆔 Assigned ActrId: {}", actor_id);
1129 tracing::info!("🔐 Received credential (key_id: {})", credential.key_id);
1130 tracing::info!(
1131 "💓 Signaling heartbeat interval: {} seconds",
1132 register_ok.signaling_heartbeat_interval_secs
1133 );
1134
1135 tracing::debug!("TurnCredential received, TURN authentication ready");
1137
1138 if let Some(expires_at) = ®ister_ok.credential_expires_at {
1139 tracing::debug!("⏰ Credential expires at: {}s", expires_at.seconds);
1140 }
1141
1142 self.actor_id = Some(actor_id.clone());
1144 let credential_state = CredentialState::new(
1145 credential,
1146 register_ok.credential_expires_at,
1147 Some(register_ok.turn_credential.clone()),
1148 );
1149 self.credential_state = Some(credential_state.clone());
1150
1151 node_hook_callback =
1162 {
1163 let actor_id_for_hook = actor_id.clone();
1164 let credential_state_for_hook = credential_state.clone();
1165 let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1170 let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1171 Arc::new(move || {
1172 let snapshot = ctx_builder_snapshot.clone();
1173 let actor_id = actor_id_for_hook.clone();
1174 let credential_state = credential_state_for_hook.clone();
1175 Box::pin(async move {
1176 Some(snapshot.build_bootstrap(
1177 &actor_id,
1178 &credential_state.credential().await,
1179 ))
1180 })
1181 });
1182 Some(crate::lifecycle::hooks::build_hook_callback(
1183 self.hook_observer.clone(),
1184 ctx_builder,
1185 ))
1186 };
1187
1188 if let Some(expires_at) = ®ister_ok.credential_expires_at {
1193 let new_expiry = std::time::UNIX_EPOCH
1194 + std::time::Duration::from_secs(expires_at.seconds.max(0) as u64);
1195 if let Some(cb) = node_hook_callback.as_ref() {
1196 cb(crate::wire::webrtc::HookEvent::CredentialRenewed { new_expiry }).await;
1197 } else {
1198 tracing::info!(new_expiry = ?new_expiry, "credential renewed");
1199 }
1200 }
1201
1202 tracing::info!("✅ Inproc infrastructure already ready (created in ActrNode::build())");
1210
1211 tracing::info!("🌐 Initializing WebRTC infrastructure");
1215
1216 let media_frame_registry = self.media_frame_registry.clone();
1217
1218 let coordinator = Arc::new(crate::wire::webrtc::WebRtcCoordinator::new(
1220 actor_id.clone(),
1221 credential_state.clone(),
1222 self.signaling_client.clone(),
1223 self.config.webrtc.clone(),
1224 media_frame_registry,
1225 ));
1226
1227 {
1231 let actor_id_for_hook = actor_id.clone();
1232 let credential_state_for_hook = credential_state.clone();
1233 let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1237 let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1238 Arc::new(move || {
1239 let snapshot = ctx_builder_snapshot.clone();
1240 let actor_id = actor_id_for_hook.clone();
1241 let credential_state = credential_state_for_hook.clone();
1242 Box::pin(async move {
1243 Some(
1244 snapshot.build_bootstrap(
1245 &actor_id,
1246 &credential_state.credential().await,
1247 ),
1248 )
1249 })
1250 });
1251 let cb = crate::lifecycle::hooks::build_hook_callback(
1252 self.hook_observer.clone(),
1253 ctx_builder,
1254 );
1255 coordinator.set_hook_callback(cb);
1256 }
1257
1258 tracing::info!("🏗️ Creating PeerTransport with WebRTC support");
1262
1263 use crate::transport::{DefaultWireBuilder, DefaultWireBuilderConfig};
1265
1266 let local_id_hex = hex::encode(actor_id.encode_to_vec());
1269 let wire_builder_config = DefaultWireBuilderConfig {
1270 local_id_hex,
1271 enable_webrtc: true,
1272 enable_websocket: true,
1273 discovered_ws_addresses: self.discovered_ws_addresses.clone(),
1276 credential_state: Some(credential_state.clone()),
1279 };
1280 let wire_builder = Arc::new(DefaultWireBuilder::new(
1281 Some(coordinator.clone()),
1282 wire_builder_config,
1283 ));
1284
1285 use crate::transport::PeerTransport;
1287 let transport_manager = Arc::new(PeerTransport::new(actor_id.clone(), wire_builder));
1288
1289 use crate::outbound::PeerGate;
1291 let outproc_gate =
1292 Arc::new(PeerGate::new(transport_manager, Some(coordinator.clone())));
1293 let outproc_gate_enum = Gate::Peer(outproc_gate.clone());
1294 tracing::info!("PeerTransport + PeerGate initialized");
1295
1296 let data_stream_registry = self.data_stream_registry.clone();
1297
1298 let pending_requests = outproc_gate.get_pending_requests();
1300 let gate = Arc::new(crate::wire::webrtc::gate::WebRtcGate::new(
1301 coordinator.clone(),
1302 pending_requests,
1303 data_stream_registry.clone(),
1304 ));
1305 gate.set_local_id(actor_id.clone()).await;
1307 tracing::info!(
1308 "✅ WebRtcGate created with shared pending_requests and DataStreamRegistry"
1309 );
1310
1311 tracing::info!("🔧 Wiring outproc_gate into node");
1318 self.outproc_gate = Some(outproc_gate_enum);
1319 tracing::info!("✅ Node runtime gates fully initialized (inproc + outproc)");
1320
1321 self.webrtc_coordinator = Some(coordinator.clone());
1323 self.webrtc_gate = Some(gate.clone());
1324 tracing::info!("✅ WebRTC infrastructure initialized");
1325
1326 {
1330 let startup_ctx = self
1331 .bootstrap_ctx_builder()
1332 .build_bootstrap(&actor_id, &credential_state.credential().await);
1333 let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_start");
1334 let call_executor =
1335 lifecycle_host_abi(startup_ctx.clone(), self.workload_dispatch.clone());
1336 let mut workload = self.workload_dispatch.lock().await;
1337 crate::lifecycle::hooks::call_lifecycle_hook(
1338 "on_start",
1339 workload.on_start(startup_ctx, invocation, &call_executor),
1340 )
1341 .await?;
1342 }
1343
1344 if let Some(listen_port) = self.config.websocket_listen_port {
1348 tracing::info!(
1349 "🔌 WebSocket direct-connect mode enabled, binding port {}",
1350 listen_port
1351 );
1352 use crate::key_cache::AisKeyCache;
1353 use crate::wire::websocket::gate::WsAuthContext;
1354 use crate::wire::websocket::{WebSocketGate, WebSocketServer};
1355
1356 let ais_key_cache = AisKeyCache::new();
1358 if !register_ok.signing_pubkey.is_empty() {
1359 match ais_key_cache
1360 .seed(register_ok.signing_key_id, ®ister_ok.signing_pubkey)
1361 .await
1362 {
1363 Ok(()) => tracing::info!(
1364 key_id = register_ok.signing_key_id,
1365 "🔑 AisKeyCache seeded from RegisterOk"
1366 ),
1367 Err(e) => tracing::warn!(
1368 key_id = register_ok.signing_key_id,
1369 error = ?e,
1370 "AisKeyCache seed failed; WebSocket will reject all inbound connections"
1371 ),
1372 }
1373 } else {
1374 tracing::warn!(
1375 "RegisterOk missing signing_pubkey; WebSocket credential verification will degrade"
1376 );
1377 }
1378
1379 let auth_ctx = WsAuthContext {
1380 ais_key_cache,
1381 actor_id: actor_id.clone(),
1382 credential_state: credential_state.clone(),
1383 signaling_client: self.signaling_client.clone(),
1384 };
1385
1386 match WebSocketServer::bind(listen_port).await {
1387 Ok((ws_server, conn_rx)) => {
1388 ws_server.start(self.shutdown_token.clone());
1389 let ws_gate = Arc::new(WebSocketGate::new(
1390 conn_rx,
1391 outproc_gate.get_pending_requests(),
1392 data_stream_registry.clone(),
1393 Some(auth_ctx),
1394 ));
1395
1396 {
1398 let actor_id_for_hook = actor_id.clone();
1399 let credential_state_for_hook = credential_state.clone();
1400 let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1404 let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1405 Arc::new(move || {
1406 let snapshot = ctx_builder_snapshot.clone();
1407 let actor_id = actor_id_for_hook.clone();
1408 let credential_state = credential_state_for_hook.clone();
1409 Box::pin(async move {
1410 Some(snapshot.build_bootstrap(
1411 &actor_id,
1412 &credential_state.credential().await,
1413 ))
1414 })
1415 });
1416 let cb = crate::lifecycle::hooks::build_hook_callback(
1417 self.hook_observer.clone(),
1418 ctx_builder,
1419 );
1420 ws_gate.set_hook_callback(cb);
1421 }
1422
1423 self.websocket_gate = Some(ws_gate);
1424 tracing::info!(
1425 "✅ WebSocketServer + WebSocketGate initialized (credential auth enabled)"
1426 );
1427 }
1428 Err(e) => {
1429 tracing::error!(
1430 "❌ Failed to bind WebSocket server on port {}: {:?}",
1431 listen_port,
1432 e
1433 );
1434 }
1435 }
1436 }
1437
1438 {
1447 let shutdown = self.shutdown_token.clone();
1448 let client = self.signaling_client.clone();
1449 let actor_id_for_heartbeat = actor_id.clone();
1450 let credential_state_for_heartbeat = credential_state.clone();
1451 let mailbox_for_heartbeat = self.mailbox.clone();
1452 let register_request_for_heartbeat = register_request.clone();
1453
1454 let heartbeat_interval_secs = register_ok.signaling_heartbeat_interval_secs;
1456 let heartbeat_interval = if heartbeat_interval_secs > 0 {
1457 Duration::from_secs(heartbeat_interval_secs as u64)
1458 } else {
1459 Duration::from_secs(30)
1460 };
1461 let ais_endpoint_for_heartbeat = self.config.ais_endpoint.clone();
1462 let heartbeat_handle = tokio::spawn(crate::lifecycle::heartbeat::heartbeat_task(
1463 shutdown,
1464 client,
1465 actor_id_for_heartbeat,
1466 credential_state_for_heartbeat,
1467 mailbox_for_heartbeat,
1468 heartbeat_interval,
1469 register_request_for_heartbeat,
1470 ais_endpoint_for_heartbeat,
1471 node_hook_callback.clone(),
1472 ));
1473 task_handles.push(heartbeat_handle);
1474 }
1475 tracing::info!(
1476 "✅ Heartbeat task started (interval: {}s)",
1477 register_ok.signaling_heartbeat_interval_secs
1478 );
1479
1480 if let (Some(event_rx), Some(result_tx)) = (
1484 self.network_event_rx.take(),
1485 self.network_event_result_tx.take(),
1486 ) {
1487 use crate::lifecycle::network_event::DefaultNetworkEventProcessor;
1488
1489 let event_processor =
1492 if let Some(config) = self.network_event_debounce_config.clone() {
1493 Arc::new(DefaultNetworkEventProcessor::new_with_debounce(
1494 self.signaling_client.clone(),
1495 self.webrtc_coordinator.clone(),
1496 config,
1497 ))
1498 } else {
1499 Arc::new(DefaultNetworkEventProcessor::new(
1500 self.signaling_client.clone(),
1501 self.webrtc_coordinator.clone(),
1502 ))
1503 };
1504
1505 let shutdown = self.shutdown_token.clone();
1506 let network_event_handle = tokio::spawn(async move {
1507 Self::network_event_loop(event_rx, result_tx, event_processor, shutdown).await;
1508 });
1509 task_handles.push(network_event_handle);
1510 tracing::info!("✅ Network event loop started");
1511 }
1512
1513 {
1514 let shutdown = self.shutdown_token.clone();
1525 let client = self.signaling_client.clone();
1526 let actor_id_for_unreg = actor_id.clone();
1527 let credential_state_for_unreg = credential_state.clone();
1528 let webrtc_coordinator = self.webrtc_coordinator.clone();
1529
1530 let unregister_handle = tokio::spawn(async move {
1531 shutdown.cancelled().await;
1533 tracing::info!(
1534 "📡 Shutdown signal received, sending UnregisterRequest for Actor {}",
1535 actor_id_for_unreg
1536 );
1537
1538 if let Some(coord) = webrtc_coordinator {
1540 if let Err(e) = coord.close_all_peers().await {
1541 tracing::warn!(
1542 "⚠️ Failed to close all WebRTC peers before UnregisterRequest: {}",
1543 e
1544 );
1545 } else {
1546 tracing::info!("✅ All WebRTC peers closed before UnregisterRequest");
1547 }
1548 } else {
1549 tracing::debug!(
1550 "WebRTC coordinator not found before UnregisterRequest (no WebRTC?)"
1551 );
1552 }
1553
1554 let result = tokio::time::timeout(
1556 Duration::from_secs(5),
1557 client.send_unregister_request(
1558 actor_id_for_unreg.clone(),
1559 credential_state_for_unreg.credential().await,
1560 Some("Graceful shutdown".to_string()),
1561 ),
1562 )
1563 .await;
1564 tracing::info!("UnregisterRequest result: {:?}", result);
1565 match result {
1566 Ok(Ok(_)) => {
1567 tracing::info!(
1568 "✅ UnregisterRequest sent to signaling server for Actor {}",
1569 actor_id_for_unreg
1570 );
1571 }
1572 Ok(Err(e)) => {
1573 tracing::warn!(
1574 "⚠️ Failed to send UnregisterRequest for Actor {}: {}",
1575 actor_id_for_unreg,
1576 e
1577 );
1578 }
1579 Err(_) => {
1580 tracing::warn!(
1581 "⚠️ UnregisterRequest timeout (5s) for Actor {}",
1582 actor_id_for_unreg
1583 );
1584 }
1585 }
1586 });
1587
1588 task_handles.push(unregister_handle);
1589 }
1590 } tracing::info!("✅ Transport layer initialized via WebRTC infrastructure");
1596
1597 let actor_id = self
1602 .actor_id
1603 .as_ref()
1604 .ok_or_else(|| ActrError::Internal("Actor ID not set".to_string()))?
1605 .clone();
1606 let bootstrap_ctx_builder = self.bootstrap_ctx_builder();
1610 let credential_state = self
1611 .credential_state
1612 .clone()
1613 .expect("CredentialState must be initialized in start()");
1614 let shutdown_token = self.shutdown_token.clone();
1615 let node_ref = Arc::new(self);
1616
1617 {
1621 let node = node_ref.clone();
1622 let actor_id = actor_id.clone();
1623 let credential_state = credential_state.clone();
1624 let shutdown = shutdown_token.clone();
1625 let on_stop_handle = tokio::spawn(async move {
1626 shutdown.cancelled().await;
1627 let stop_ctx = node
1628 .bootstrap_ctx_builder()
1629 .build_bootstrap(&actor_id, &credential_state.credential().await);
1630 let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_stop");
1631 let call_executor =
1632 lifecycle_host_abi(stop_ctx.clone(), node.workload_dispatch.clone());
1633 let mut workload = node.workload_dispatch.lock().await;
1634 if let Err(e) = crate::lifecycle::hooks::call_lifecycle_hook(
1635 "on_stop",
1636 workload.on_stop(stop_ctx, invocation, &call_executor),
1637 )
1638 .await
1639 {
1640 tracing::warn!(error = %e, "workload on_stop returned Err");
1641 }
1642 });
1643 task_handles.push(on_stop_handle);
1644 }
1645
1646 tracing::info!("🚀 Starting WebRTC background loops");
1650
1651 if let Some(coordinator) = &node_ref.webrtc_coordinator {
1653 coordinator.clone().start().await.map_err(|e| {
1654 ActrError::Unavailable(format!("WebRtcCoordinator start failed: {e}"))
1655 })?;
1656 tracing::info!("✅ WebRtcCoordinator signaling loop started");
1657 }
1658
1659 if let Some(gate) = &node_ref.webrtc_gate {
1661 gate.start_receive_loop(node_ref.mailbox.clone())
1662 .await
1663 .map_err(|e| {
1664 ActrError::Unavailable(format!("WebRtcGate receive loop start failed: {e}"))
1665 })?;
1666 tracing::info!("✅ WebRtcGate → Mailbox routing started");
1667 }
1668
1669 if let Some(ws_gate) = &node_ref.websocket_gate {
1671 ws_gate
1672 .start_receive_loop(node_ref.mailbox.clone())
1673 .await
1674 .map_err(|e| {
1675 ActrError::Unavailable(format!("WebSocketGate receive loop start failed: {e}"))
1676 })?;
1677 tracing::info!("✅ WebSocketGate → Mailbox routing started");
1678 }
1679 tracing::info!("✅ WebRTC background loops started");
1680
1681 if let Some(shell_to_workload) = &node_ref.shell_to_workload {
1685 tracing::info!("🔄 Starting Inproc receive loop (Shell → Guest)");
1686 if let Some(workload_to_shell) = &node_ref.workload_to_shell {
1688 let node = node_ref.clone();
1689 let request_rx_lane = shell_to_workload
1690 .get_lane(PayloadType::RpcReliable, None)
1691 .await
1692 .map_err(|e| {
1693 ActrError::Unavailable(format!("Failed to get guest receive lane: {e}"))
1694 })?;
1695 let response_tx = workload_to_shell.clone();
1696 let shutdown = shutdown_token.clone();
1697
1698 let inproc_handle = tokio::spawn(async move {
1699 loop {
1700 tokio::select! {
1701 _ = shutdown.cancelled() => {
1702 tracing::info!("📭 Guest receive loop (Shell → Guest) received shutdown signal");
1703 break;
1704 }
1705 envelope_result = request_rx_lane.recv_envelope() => {
1706 match envelope_result {
1707 Ok(envelope) => {
1708 let request_id = envelope.request_id.clone();
1709 tracing::debug!("📨 Guest received REQUEST from Shell: request_id={}", request_id);
1710 #[cfg(feature = "opentelemetry")]
1712 let span = {
1713 let actr_id_str = node.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default();
1714 let span = tracing::info_span!("ActrNode.lane_receive", actr_id = %actr_id_str, request_id = %request_id);
1715 set_parent_from_rpc_envelope(&span, &envelope);
1716 span
1717 };
1718
1719 let handle_incoming_fut = node.handle_incoming(envelope.clone(), None);
1721 #[cfg(feature = "opentelemetry")]
1722 let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
1723
1724 match handle_incoming_fut.await {
1725 Ok(response_bytes) => {
1726 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1729 let mut response_envelope = RpcEnvelope {
1730 route_key: envelope.route_key.clone(),
1731 payload: Some(response_bytes),
1732 error: None,
1733 traceparent: None,
1734 tracestate: None,
1735 request_id: request_id.clone(),
1736 metadata: Vec::new(),
1737 timeout_ms: 30000,
1738 };
1739 #[cfg(feature = "opentelemetry")]
1741 inject_span_context_to_rpc(&span, &mut response_envelope);
1742
1743 let send_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, response_envelope);
1745 #[cfg(feature = "opentelemetry")]
1746 let send_response_fut = send_response_fut.instrument(span.clone());
1747 if let Err(e) = send_response_fut.await {
1748 tracing::error!(
1749 severity = 7,
1750 error_category = "transport_error",
1751 request_id = %request_id,
1752 "❌ Failed to send RESPONSE to Shell: {:?}",
1753 e
1754 );
1755 }
1756 }
1757 Err(e) => {
1758 tracing::error!(
1759 severity = 6,
1760 error_category = "handler_error",
1761 request_id = %request_id,
1762 route_key = %envelope.route_key,
1763 "❌ Guest message handling failed: {:?}",
1764 e
1765 );
1766
1767 let error_response = actr_protocol::ErrorResponse {
1769 code: protocol_error_to_code(&e),
1770 message: e.to_string(),
1771 };
1772 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1773 let mut error_envelope = RpcEnvelope {
1774 route_key: envelope.route_key.clone(),
1775 payload: None,
1776 error: Some(error_response),
1777 traceparent: envelope.traceparent.clone(),
1778 tracestate: envelope.tracestate.clone(),
1779 request_id: request_id.clone(),
1780 metadata: Vec::new(),
1781 timeout_ms: 30000,
1782 };
1783 #[cfg(feature = "opentelemetry")]
1785 inject_span_context_to_rpc(&span, &mut error_envelope);
1786
1787 let send_error_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, error_envelope);
1788 #[cfg(feature = "opentelemetry")]
1789 let send_error_response_fut = send_error_response_fut.instrument(span);
1790 if let Err(send_err) = send_error_response_fut.await {
1791 tracing::error!(
1792 severity = 7,
1793 error_category = "transport_error",
1794 request_id = %request_id,
1795 "❌ Failed to send ERROR response to Shell: {:?}",
1796 send_err
1797 );
1798 }
1799 }
1800 }
1801 }
1802 Err(e) => {
1803 tracing::error!(
1804 severity = 8,
1805 error_category = "transport_error",
1806 "❌ Failed to receive from Shell → Guest lane: {:?}",
1807 e
1808 );
1809 break;
1810 }
1811 }
1812 }
1813 }
1814 }
1815 tracing::info!("✅ Guest receive loop (Shell → Guest) terminated gracefully");
1816 });
1817 task_handles.push(inproc_handle);
1818 }
1819 }
1820 tracing::info!("✅ Guest receive loop (Shell → Guest REQUEST) started");
1821
1822 tracing::info!("🔄 Starting Shell receive loop (Guest → Shell RESPONSE)");
1826 if let Some(workload_to_shell) = &node_ref.workload_to_shell {
1827 if let Some(shell_to_workload) = &node_ref.shell_to_workload {
1829 let response_rx_lane = workload_to_shell
1830 .get_lane(PayloadType::RpcReliable, None)
1831 .await
1832 .map_err(|e| {
1833 ActrError::Unavailable(format!("Failed to get shell receive lane: {e}"))
1834 })?;
1835 let request_mgr = shell_to_workload.clone();
1836 let shutdown = shutdown_token.clone();
1837
1838 let shell_receive_handle = tokio::spawn(async move {
1839 loop {
1840 tokio::select! {
1841 _ = shutdown.cancelled() => {
1842 tracing::info!("📭 Shell receive loop (Guest → Shell) received shutdown signal");
1843 break;
1844 }
1845 envelope_result = response_rx_lane.recv_envelope() => {
1846 match envelope_result {
1847 Ok(envelope) => {
1848 tracing::debug!(
1849 "📨 Shell received RESPONSE from Guest: request_id={}",
1850 envelope.request_id
1851 );
1852
1853 match (envelope.payload, envelope.error) {
1855 (Some(payload), None) => {
1856 if let Err(e) = request_mgr
1858 .complete_response(&envelope.request_id, payload)
1859 .await
1860 {
1861 tracing::warn!(
1862 severity = 4,
1863 error_category = "orphan_response",
1864 request_id = %envelope.request_id,
1865 "⚠️ No pending request found for response: {:?}",
1866 e
1867 );
1868 }
1869 }
1870 (None, Some(error)) => {
1871 let actr_err = ActrError::Unavailable(format!("RPC error {}: {}", error.code, error.message));
1873 if let Err(e) = request_mgr
1874 .complete_error(&envelope.request_id, actr_err)
1875 .await
1876 {
1877 tracing::warn!(
1878 severity = 4,
1879 error_category = "orphan_response",
1880 request_id = %envelope.request_id,
1881 "⚠️ No pending request found for error response: {:?}",
1882 e
1883 );
1884 }
1885 }
1886 _ => {
1887 tracing::error!(
1888 severity = 7,
1889 error_category = "protocol_error",
1890 request_id = %envelope.request_id,
1891 "❌ Invalid RpcEnvelope: both payload and error are present or both absent"
1892 );
1893 }
1894 }
1895 }
1896 Err(e) => {
1897 tracing::error!(
1898 severity = 8,
1899 error_category = "transport_error",
1900 "❌ Failed to receive from Guest → Shell lane: {:?}",
1901 e
1902 );
1903 break;
1904 }
1905 }
1906 }
1907 }
1908 }
1909 tracing::info!("✅ Shell receive loop (Guest → Shell) terminated gracefully");
1910 });
1911 task_handles.push(shell_receive_handle);
1912 }
1913 }
1914 tracing::info!("✅ Shell receive loop (Guest → Shell RESPONSE) started");
1915
1916 let backpressure_threshold = node_ref.mailbox_backpressure_threshold;
1931 {
1932 use std::sync::atomic::{AtomicBool, Ordering};
1933 let mailbox = node_ref.mailbox.clone();
1934 let shutdown = shutdown_token.clone();
1935 let hook_cb = node_hook_callback.clone();
1936 let triggered = Arc::new(AtomicBool::new(false));
1937
1938 let fire_if_rising = {
1941 let triggered = triggered.clone();
1942 let hook_cb = hook_cb.clone();
1943 Arc::new(move |queue_len: usize| {
1944 if queue_len >= backpressure_threshold {
1945 if !triggered.swap(true, Ordering::AcqRel) {
1946 if let Some(cb) = hook_cb.as_ref() {
1947 let cb = cb.clone();
1948 tokio::spawn(async move {
1949 cb(crate::wire::webrtc::HookEvent::MailboxBackpressure {
1950 queue_len,
1951 threshold: backpressure_threshold,
1952 })
1953 .await;
1954 });
1955 } else {
1956 tracing::warn!(
1957 queue_len,
1958 threshold = backpressure_threshold,
1959 "mailbox backpressure",
1960 );
1961 }
1962 }
1963 } else if triggered.swap(false, Ordering::AcqRel) {
1964 tracing::info!(
1965 queue_len,
1966 threshold = backpressure_threshold,
1967 "mailbox backpressure cleared",
1968 );
1969 }
1970 })
1971 };
1972
1973 struct EnqueueObserver {
1977 fire: Arc<dyn Fn(usize) + Send + Sync + 'static>,
1978 }
1979 impl actr_runtime_mailbox::MailboxDepthObserver for EnqueueObserver {
1980 fn on_depth_change(&self, queued_messages: usize) {
1981 (self.fire)(queued_messages);
1982 }
1983 }
1984
1985 let installed = {
1986 let observer: Arc<dyn actr_runtime_mailbox::MailboxDepthObserver> =
1987 Arc::new(EnqueueObserver {
1988 fire: fire_if_rising.clone(),
1989 });
1990 mailbox.set_depth_observer(observer)
1991 };
1992
1993 if installed {
1994 tracing::debug!("mailbox backpressure watchdog: push notifications enabled");
1995 } else {
1996 tracing::debug!(
1997 "mailbox backpressure watchdog: backend does not support push, falling back to 1 Hz polling"
1998 );
1999 let mailbox_for_poll = mailbox.clone();
2000 let shutdown_for_poll = shutdown.clone();
2001 let fire_for_poll = fire_if_rising.clone();
2002 let watchdog_handle = tokio::spawn(async move {
2003 let mut ticker = tokio::time::interval(Duration::from_secs(1));
2004 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
2005 loop {
2006 tokio::select! {
2007 _ = shutdown_for_poll.cancelled() => {
2008 tracing::debug!(
2009 "mailbox backpressure watchdog shutting down"
2010 );
2011 break;
2012 }
2013 _ = ticker.tick() => {
2014 let status = match mailbox_for_poll.status().await {
2015 Ok(s) => s,
2016 Err(e) => {
2017 tracing::debug!(?e, "mailbox status poll failed");
2018 continue;
2019 }
2020 };
2021 fire_for_poll(status.queued_messages as usize);
2022 }
2023 }
2024 }
2025 });
2026 task_handles.push(watchdog_handle);
2027 }
2028 }
2029
2030 tracing::info!("🔄 Starting Mailbox processing loop (State Path)");
2034 {
2035 let node = node_ref.clone();
2036 let mailbox = node_ref.mailbox.clone();
2037 let gate = node_ref.webrtc_gate.clone();
2038 let shutdown = shutdown_token.clone();
2039
2040 let mailbox_handle = tokio::spawn(async move {
2041 loop {
2042 tokio::select! {
2043 _ = shutdown.cancelled() => {
2045 tracing::info!("📭 Mailbox loop received shutdown signal");
2046 break;
2047 }
2048 result = mailbox.dequeue() => {
2050 match result {
2051 Ok(messages) => {
2052 if messages.is_empty() {
2053 tokio::time::sleep(Duration::from_millis(10)).await;
2055 continue;
2056 }
2057 tracing::debug!("📬 Mailbox dequeue: {} messages", messages.len());
2058
2059 for msg_record in messages {
2061 match RpcEnvelope::decode(&msg_record.payload[..]) {
2063 Ok(envelope) => {
2064 let request_id = envelope.request_id.clone();
2065 let queue_latency_ms = (chrono::Utc::now() - msg_record.created_at).num_milliseconds();
2066 tracing::info!(request_id = %request_id, queue_latency_ms = queue_latency_ms, "rpc.mailbox.dequeued");
2067
2068 tracing::debug!("📦 Processing message: request_id={}", request_id);
2069 #[cfg(feature = "opentelemetry")]
2070 let span = {
2071 let actr_id_str = node.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default();
2072 let span = tracing::info_span!("ActrNode.mailbox_receive", actr_id = %actr_id_str, request_id = %request_id, queue_wait_ms = queue_latency_ms);
2073 set_parent_from_rpc_envelope(&span, &envelope);
2074 span
2075 };
2076
2077 let caller_id_result = ActrId::decode(&msg_record.from[..]);
2079 let caller_id_ref = caller_id_result.as_ref().ok();
2080
2081 if caller_id_ref.is_none() {
2082 tracing::warn!(
2083 request_id = %request_id,
2084 "⚠️ Failed to decode caller_id from MessageRecord.from"
2085 );
2086 }
2087
2088 let handle_incoming_fut = node.handle_incoming(envelope.clone(), caller_id_ref);
2090 #[cfg(feature = "opentelemetry")]
2091 let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
2092
2093 match handle_incoming_fut.await {
2094 Ok(response_bytes) => {
2095 if let Some(ref gate) = gate {
2097 match caller_id_result {
2099 Ok(caller) => {
2100 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
2102 let mut response_envelope = RpcEnvelope {
2103 request_id, route_key: envelope.route_key.clone(),
2105 payload: Some(response_bytes),
2106 error: None,
2107 traceparent: envelope.traceparent.clone(),
2108 tracestate: envelope.tracestate.clone(),
2109 metadata: Vec::new(), timeout_ms: 30000,
2111 };
2112 #[cfg(feature = "opentelemetry")]
2114 inject_span_context_to_rpc(&span, &mut response_envelope);
2115
2116 let send_response_fut = gate.send_response(&caller, response_envelope);
2117 #[cfg(feature = "opentelemetry")]
2118 let send_response_fut = send_response_fut.instrument(span);
2119 if let Err(e) = send_response_fut.await {
2120 tracing::error!(
2121 severity = 7,
2122 error_category = "transport_error",
2123 request_id = %envelope.request_id,
2124 "❌ Failed to send response: {:?}",
2125 e
2126 );
2127 }
2128 }
2129 Err(e) => {
2130 tracing::error!(
2131 severity = 8,
2132 error_category = "protobuf_decode",
2133 request_id = %envelope.request_id,
2134 "❌ Failed to decode caller_id: {:?}",
2135 e
2136 );
2137 }
2138 }
2139 }
2140
2141 if let Err(e) = mailbox.ack(msg_record.id).await {
2143 tracing::error!(
2144 severity = 9,
2145 error_category = "mailbox_error",
2146 request_id = %envelope.request_id,
2147 message_id = %msg_record.id,
2148 "❌ Mailbox ACK failed: {:?}",
2149 e
2150 );
2151 }
2152 }
2153 Err(e) => {
2154 tracing::error!(
2155 severity = 6,
2156 error_category = "handler_error",
2157 request_id = %envelope.request_id,
2158 route_key = %envelope.route_key,
2159 "❌ handle_incoming failed: {:?}", e
2160 );
2161 let _ = mailbox.ack(msg_record.id).await;
2164 }
2165 }
2166 }
2167 Err(e) => {
2168 tracing::error!(
2170 severity = 9,
2171 error_category = "protobuf_decode",
2172 message_id = %msg_record.id,
2173 "❌ Poison message: Failed to deserialize RpcEnvelope: {:?}",
2174 e
2175 );
2176
2177 use actr_runtime_mailbox::DlqRecord;
2179 use chrono::Utc;
2180 use uuid::Uuid;
2181
2182 let dlq_record = DlqRecord {
2183 id: Uuid::new_v4(),
2184 original_message_id: Some(msg_record.id.to_string()),
2185 from: Some(msg_record.from.clone()),
2186 to: node.actor_id.as_ref().map(|id| {
2187 let mut buf = Vec::new();
2188 id.encode(&mut buf).unwrap();
2189 buf
2190 }),
2191 raw_bytes: msg_record.payload.clone(),
2192 error_message: format!("Protobuf decode failed: {e}"),
2193 error_category: "protobuf_decode".to_string(),
2194 trace_id: format!("mailbox-{}", msg_record.id),
2195 request_id: None,
2196 created_at: Utc::now(),
2197 redrive_attempts: 0,
2198 last_redrive_at: None,
2199 context: Some(format!(
2200 r#"{{"source":"mailbox","priority":"{}"}}"#,
2201 match msg_record.priority {
2202 actr_runtime_mailbox::MessagePriority::High => "high",
2203 actr_runtime_mailbox::MessagePriority::Normal => "normal",
2204 }
2205 )),
2206 };
2207
2208 if let Err(dlq_err) = node.dlq.enqueue(dlq_record).await {
2209 tracing::error!(
2210 severity = 10,
2211 "❌ CRITICAL: Failed to write poison message to DLQ: {:?}",
2212 dlq_err
2213 );
2214 } else {
2215 tracing::warn!(
2216 severity = 9,
2217 "☠️ Poison message moved to DLQ: message_id={}",
2218 msg_record.id
2219 );
2220 }
2221
2222 let _ = mailbox.ack(msg_record.id).await;
2224 }
2225 }
2226 }
2227 }
2228 Err(e) => {
2229 tracing::error!(
2230 severity = 9,
2231 error_category = "mailbox_error",
2232 "❌ Mailbox dequeue failed: {:?}", e
2233 );
2234 tokio::time::sleep(Duration::from_secs(1)).await;
2235 }
2236 }
2237 }
2238 }
2239 }
2240 tracing::info!("✅ Mailbox processing loop terminated gracefully");
2241 });
2242
2243 task_handles.push(mailbox_handle);
2244 }
2245 tracing::info!("✅ Mailbox processing loop started");
2246 tracing::info!("✅ ActrNode started successfully");
2247
2248 {
2249 let ready_ctx = bootstrap_ctx_builder
2250 .build_bootstrap(&actor_id, &credential_state.credential().await);
2251 let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_ready");
2252 let call_executor =
2253 lifecycle_host_abi(ready_ctx.clone(), node_ref.workload_dispatch.clone());
2254 let mut workload = node_ref.workload_dispatch.lock().await;
2255 if let Err(e) = crate::lifecycle::hooks::call_lifecycle_hook(
2256 "on_ready",
2257 workload.on_ready(ready_ctx, invocation, &call_executor),
2258 )
2259 .await
2260 {
2261 tracing::warn!(error = %e, "workload on_ready returned Err");
2262 }
2263 }
2264
2265 let shared = Arc::new(ActrRefShared {
2267 actor_id,
2268 bootstrap_ctx_builder,
2269 credential_state,
2270 shutdown_token,
2271 task_handles: Mutex::new(task_handles),
2272 });
2273
2274 tracing::info!("✅ ActrRef created (Shell → Guest communication handle)");
2276
2277 Ok(ActrRef { shared })
2278 }
2279}