1use crate::actr_ref::{ActrRef, ActrRefShared};
9use crate::ais_client::AisClient;
10use crate::context::{BootstrapContextBuilder, RuntimeContext};
11use crate::inbound::{DataStreamRegistry, MediaFrameRegistry};
12use crate::lifecycle::dedup::{DEDUP_TTL, DedupOutcome, DedupState, DedupWaiter};
13use crate::outbound::Gate;
14use crate::transport::HostTransport;
15use crate::wire::webrtc::SignalingClient;
16#[cfg(feature = "opentelemetry")]
17use crate::wire::webrtc::trace::{inject_span_context_to_rpc, set_parent_from_rpc_envelope};
18use actr_framework::Bytes;
19use actr_protocol::prost::Message as ProstMessage;
20use actr_protocol::{
21 AIdCredential, ActorResult, ActrError, ActrId, PayloadType, RegisterAuthMode, RegisterRequest,
22 RpcEnvelope, TurnCredential, register_response,
23};
24use actr_runtime::check_acl_permission;
25use actr_runtime_mailbox::{DeadLetterQueue, Mailbox};
26use std::sync::Arc;
27use std::time::Duration;
28use tokio::sync::{Mutex, RwLock};
29use tokio_util::sync::CancellationToken;
30#[cfg(feature = "opentelemetry")]
31use tracing::Instrument as _;
32
33pub(crate) struct Inner {
40 pub(crate) config: actr_config::RuntimeConfig,
42
43 pub(crate) mailbox: Arc<dyn Mailbox>,
45
46 pub(crate) dlq: Arc<dyn DeadLetterQueue>,
48
49 pub(crate) inproc_gate: Gate,
54
55 pub(crate) outproc_gate: Option<Gate>,
61
62 pub(crate) data_stream_registry: Arc<DataStreamRegistry>,
66
67 pub(crate) media_frame_registry: Arc<MediaFrameRegistry>,
70
71 pub(crate) signaling_client: Arc<dyn SignalingClient>,
73
74 pub(crate) actor_id: Option<ActrId>,
76
77 pub(crate) credential_state: Option<CredentialState>,
79
80 pub(crate) webrtc_coordinator: Option<Arc<crate::wire::webrtc::WebRtcCoordinator>>,
82
83 pub(crate) webrtc_gate: Option<Arc<crate::wire::webrtc::gate::WebRtcGate>>,
85
86 pub(crate) websocket_gate: Option<Arc<crate::wire::websocket::WebSocketGate>>,
88
89 pub(crate) shell_to_workload: Option<Arc<HostTransport>>,
93
94 pub(crate) workload_to_shell: Option<Arc<HostTransport>>,
98
99 pub(crate) shutdown_token: CancellationToken,
101
102 pub(crate) actr_lock: Option<Arc<actr_config::lock::LockFile>>,
107 pub(crate) network_event_rx:
109 Option<tokio::sync::mpsc::Receiver<crate::lifecycle::network_event::NetworkEvent>>,
110
111 pub(crate) network_event_result_tx:
113 Option<tokio::sync::mpsc::Sender<crate::lifecycle::network_event::NetworkEventResult>>,
114
115 pub(crate) network_event_debounce_config:
117 Option<crate::lifecycle::network_event::DebounceConfig>,
118
119 pub(crate) dedup_state: Arc<Mutex<DedupState>>,
121
122 #[allow(dead_code)]
124 pub(crate) package_manifest: Option<actr_pack::PackageManifest>,
125
126 pub(crate) preregistered_credential: Option<actr_protocol::register_response::RegisterOk>,
130
131 pub(crate) discovered_ws_addresses:
137 Arc<tokio::sync::RwLock<std::collections::HashMap<ActrId, String>>>,
138
139 pub(crate) workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
152
153 #[allow(dead_code)]
161 pub(crate) hook_observer: Option<crate::lifecycle::hooks::WorkloadHookObserverRef>,
162
163 pub(crate) mailbox_backpressure_threshold: usize,
169
170 #[allow(dead_code)]
174 pub(crate) credential_expiry_warning: Duration,
175}
176
177#[derive(Clone)]
179pub struct CredentialState {
180 inner: Arc<RwLock<CredentialStateInner>>,
181}
182
183#[derive(Clone)]
184struct CredentialStateInner {
185 credential: AIdCredential,
186 expires_at: Option<prost_types::Timestamp>,
187 turn_credential: Option<TurnCredential>,
189}
190
191impl CredentialState {
192 pub fn new(
194 credential: AIdCredential,
195 expires_at: Option<prost_types::Timestamp>,
196 turn_credential: Option<TurnCredential>,
197 ) -> Self {
198 Self {
199 inner: Arc::new(RwLock::new(CredentialStateInner {
200 credential,
201 expires_at,
202 turn_credential,
203 })),
204 }
205 }
206
207 pub async fn credential(&self) -> AIdCredential {
208 self.inner.read().await.credential.clone()
209 }
210
211 pub async fn expires_at(&self) -> Option<prost_types::Timestamp> {
212 self.inner.read().await.expires_at
213 }
214
215 pub async fn turn_credential(&self) -> Option<TurnCredential> {
217 self.inner.read().await.turn_credential.clone()
218 }
219
220 pub(crate) async fn update(
224 &self,
225 credential: AIdCredential,
226 expires_at: Option<prost_types::Timestamp>,
227 turn_credential: Option<TurnCredential>,
228 ) {
229 let mut guard = self.inner.write().await;
230 guard.credential = credential;
231 guard.expires_at = expires_at;
232 if turn_credential.is_some() {
233 guard.turn_credential = turn_credential;
234 }
235 }
236}
237
238async fn host_operation_handler(
242 ctx: crate::context::RuntimeContext,
243 workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
244 pending: crate::workload::HostOperation,
245) -> crate::workload::HostOperationResult {
246 use crate::workload::{HostOperation, HostOperationResult, decode_dest};
247 use actr_framework::guest::dynclib_abi::code as abi_code;
248 use actr_framework::{Context as _, Dest};
249 use actr_protocol::{DataStream, PayloadType};
250
251 fn actr_error_to_code(err: &ActrError) -> i32 {
253 match err {
254 ActrError::DecodeFailure(_) | ActrError::InvalidArgument(_) => abi_code::PROTOCOL_ERROR,
255 _ => abi_code::GENERIC_ERROR,
256 }
257 }
258
259 match pending {
260 HostOperation::CallRaw(req) => {
261 match ctx
262 .call_raw(
263 &Dest::Actor(req.target),
264 req.route_key,
265 PayloadType::RpcReliable,
266 bytes::Bytes::from(req.payload),
267 30_000,
268 )
269 .await
270 {
271 Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
272 Err(e) => {
273 tracing::error!("call_raw routing failed: {e:?}");
274 HostOperationResult::Error(actr_error_to_code(&e))
275 }
276 }
277 }
278
279 HostOperation::Call(req) => {
280 let dest = match decode_dest(&req.dest) {
281 Some(d) => d,
282 None => {
283 tracing::error!(route_key = req.route_key, "call: dest decode failed");
284 return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
285 }
286 };
287 match ctx
288 .call_raw(
289 &dest,
290 req.route_key,
291 PayloadType::RpcReliable,
292 bytes::Bytes::from(req.payload),
293 30_000,
294 )
295 .await
296 {
297 Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
298 Err(e) => {
299 tracing::error!("call routing failed: {e:?}");
300 HostOperationResult::Error(actr_error_to_code(&e))
301 }
302 }
303 }
304
305 HostOperation::Tell(req) => {
306 let dest = match decode_dest(&req.dest) {
307 Some(d) => d,
308 None => {
309 tracing::error!(route_key = req.route_key, "tell: dest decode failed");
310 return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
311 }
312 };
313 match ctx
314 .tell_raw(
315 &dest,
316 req.route_key,
317 PayloadType::RpcReliable,
318 bytes::Bytes::from(req.payload),
319 )
320 .await
321 {
322 Ok(()) => HostOperationResult::Done,
323 Err(e) => {
324 tracing::error!("tell routing failed: {e:?}");
325 HostOperationResult::Error(actr_error_to_code(&e))
326 }
327 }
328 }
329
330 HostOperation::Discover(req) => {
331 match ctx.discover_route_candidate(&req.target_type).await {
332 Ok(id) => HostOperationResult::Bytes(id.encode_to_vec()),
333 Err(e) => {
334 tracing::error!("discover failed: {e:?}");
335 HostOperationResult::Error(actr_error_to_code(&e))
336 }
337 }
338 }
339
340 HostOperation::RegisterStream(req) => {
341 let stream_id = req.stream_id;
342 let callback_ctx = ctx.clone();
343 let callback_workload_dispatch = workload_dispatch.clone();
344 match ctx
345 .register_stream(stream_id, move |chunk: DataStream, sender| {
346 let ctx_for_executor = callback_ctx.clone();
347 let workload_dispatch = callback_workload_dispatch.clone();
348 Box::pin(async move {
349 let invocation = crate::workload::InvocationContext {
350 self_id: actr_framework::Context::self_id(&ctx_for_executor).clone(),
351 caller_id: Some(sender.clone()),
352 request_id: format!(
353 "data-stream:{}:{}",
354 chunk.stream_id, chunk.sequence
355 ),
356 };
357 let call_executor: crate::workload::HostAbiFn =
358 std::sync::Arc::new(move |pending| {
359 let ctx = ctx_for_executor.clone();
360 Box::pin(async move {
361 stream_callback_host_operation_handler(ctx, pending).await
362 })
363 });
364 let mut guard = workload_dispatch.lock().await;
365 guard
366 .dispatch_data_stream(chunk, sender, invocation, &call_executor)
367 .await
368 })
369 })
370 .await
371 {
372 Ok(()) => HostOperationResult::Done,
373 Err(e) => {
374 tracing::error!("register_stream failed: {e:?}");
375 HostOperationResult::Error(actr_error_to_code(&e))
376 }
377 }
378 }
379
380 HostOperation::UnregisterStream(req) => match ctx.unregister_stream(&req.stream_id).await {
381 Ok(()) => HostOperationResult::Done,
382 Err(e) => {
383 tracing::error!("unregister_stream failed: {e:?}");
384 HostOperationResult::Error(actr_error_to_code(&e))
385 }
386 },
387
388 HostOperation::SendDataStream(req) => {
389 let dest = match decode_dest(&req.dest) {
390 Some(d) => d,
391 None => {
392 tracing::error!("send_data_stream: dest decode failed");
393 return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
394 }
395 };
396 let payload_type = match PayloadType::try_from(req.payload_type) {
397 Ok(PayloadType::StreamReliable | PayloadType::StreamLatencyFirst) => {
398 PayloadType::try_from(req.payload_type).expect("checked payload type")
399 }
400 Ok(other) => {
401 tracing::error!(?other, "send_data_stream: invalid stream payload type");
402 return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
403 }
404 Err(_) => {
405 tracing::error!(
406 payload_type = req.payload_type,
407 "send_data_stream: unknown payload type"
408 );
409 return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
410 }
411 };
412 match ctx.send_data_stream(&dest, req.chunk, payload_type).await {
413 Ok(()) => HostOperationResult::Done,
414 Err(e) => {
415 tracing::error!("send_data_stream failed: {e:?}");
416 HostOperationResult::Error(actr_error_to_code(&e))
417 }
418 }
419 }
420 }
421}
422
423fn lifecycle_invocation(
424 actor_id: &ActrId,
425 request_id: &'static str,
426) -> crate::workload::InvocationContext {
427 crate::workload::InvocationContext {
428 self_id: actor_id.clone(),
429 caller_id: None,
430 request_id: request_id.to_string(),
431 }
432}
433
434pub(crate) fn lifecycle_host_abi(
435 ctx: crate::context::RuntimeContext,
436 workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
437) -> crate::workload::HostAbiFn {
438 std::sync::Arc::new(move |pending| {
439 let ctx = ctx.clone();
440 let workload_dispatch = workload_dispatch.clone();
441 Box::pin(async move { host_operation_handler(ctx, workload_dispatch, pending).await })
442 })
443}
444
445async fn stream_callback_host_operation_handler(
446 ctx: crate::context::RuntimeContext,
447 pending: crate::workload::HostOperation,
448) -> crate::workload::HostOperationResult {
449 use crate::workload::{HostOperation, HostOperationResult, decode_dest};
450 use actr_framework::guest::dynclib_abi::code as abi_code;
451 use actr_framework::{Context as _, Dest};
452 use actr_protocol::PayloadType;
453
454 fn actr_error_to_code(err: &ActrError) -> i32 {
455 match err {
456 ActrError::DecodeFailure(_) | ActrError::InvalidArgument(_) => abi_code::PROTOCOL_ERROR,
457 _ => abi_code::GENERIC_ERROR,
458 }
459 }
460
461 match pending {
462 HostOperation::CallRaw(req) => {
463 match ctx
464 .call_raw(
465 &Dest::Actor(req.target),
466 req.route_key,
467 PayloadType::RpcReliable,
468 bytes::Bytes::from(req.payload),
469 30_000,
470 )
471 .await
472 {
473 Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
474 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
475 }
476 }
477 HostOperation::Call(req) => {
478 let dest = match decode_dest(&req.dest) {
479 Some(d) => d,
480 None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
481 };
482 match ctx
483 .call_raw(
484 &dest,
485 req.route_key,
486 PayloadType::RpcReliable,
487 bytes::Bytes::from(req.payload),
488 30_000,
489 )
490 .await
491 {
492 Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
493 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
494 }
495 }
496 HostOperation::Tell(req) => {
497 let dest = match decode_dest(&req.dest) {
498 Some(d) => d,
499 None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
500 };
501 match ctx
502 .tell_raw(
503 &dest,
504 req.route_key,
505 PayloadType::RpcReliable,
506 bytes::Bytes::from(req.payload),
507 )
508 .await
509 {
510 Ok(()) => HostOperationResult::Done,
511 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
512 }
513 }
514 HostOperation::Discover(req) => {
515 match ctx.discover_route_candidate(&req.target_type).await {
516 Ok(id) => HostOperationResult::Bytes(id.encode_to_vec()),
517 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
518 }
519 }
520 HostOperation::RegisterStream(_) => {
521 tracing::error!("register_stream from inside a stream callback is not supported");
522 HostOperationResult::Error(abi_code::UNSUPPORTED_OP)
523 }
524 HostOperation::UnregisterStream(req) => match ctx.unregister_stream(&req.stream_id).await {
525 Ok(()) => HostOperationResult::Done,
526 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
527 },
528 HostOperation::SendDataStream(req) => {
529 let dest = match decode_dest(&req.dest) {
530 Some(d) => d,
531 None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
532 };
533 let payload_type = match PayloadType::try_from(req.payload_type) {
534 Ok(PayloadType::StreamReliable | PayloadType::StreamLatencyFirst) => {
535 PayloadType::try_from(req.payload_type).expect("checked payload type")
536 }
537 Ok(_) | Err(_) => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
538 };
539 match ctx.send_data_stream(&dest, req.chunk, payload_type).await {
540 Ok(()) => HostOperationResult::Done,
541 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
542 }
543 }
544 }
545}
546
547fn protocol_error_to_code(err: &ActrError) -> u32 {
549 match err {
550 ActrError::Unavailable(_) => 503, ActrError::TimedOut => 504, ActrError::NotFound(_) => 404, ActrError::PermissionDenied(_) => 403, ActrError::InvalidArgument(_) => 400, ActrError::UnknownRoute(_) => 404, ActrError::DependencyNotFound { .. } => 400, ActrError::DecodeFailure(_) => 400, ActrError::NotImplemented(_) => 501, ActrError::Internal(_) => 500, }
561}
562
563impl Inner {
564 #[allow(dead_code)]
565 pub(crate) fn package_manifest(&self) -> Option<&actr_pack::PackageManifest> {
566 self.package_manifest.as_ref()
567 }
568
569 async fn network_event_loop(
576 event_rx: tokio::sync::mpsc::Receiver<crate::lifecycle::network_event::NetworkEvent>,
577 result_tx: tokio::sync::mpsc::Sender<crate::lifecycle::network_event::NetworkEventResult>,
578 event_processor: Arc<dyn crate::lifecycle::network_event::NetworkEventProcessor>,
579 shutdown_token: CancellationToken,
580 ) {
581 crate::lifecycle::network_event::run_network_event_reconciler(
582 event_rx,
583 result_tx,
584 event_processor,
585 shutdown_token,
586 )
587 .await;
588 }
589
590 fn duplicate_wait_timeout(timeout_ms: i64) -> Duration {
591 if timeout_ms > 0 {
592 Duration::from_millis(timeout_ms as u64)
593 } else {
594 DEDUP_TTL
595 }
596 }
597
598 async fn wait_for_inflight_duplicate(
599 mut waiter: DedupWaiter,
600 timeout: Duration,
601 ) -> ActorResult<Bytes> {
602 let wait_for_result = async {
603 loop {
604 if let Some(result) = waiter.borrow().clone() {
605 return result;
606 }
607
608 if waiter.changed().await.is_err() {
609 if let Some(result) = waiter.borrow().clone() {
610 return result;
611 }
612 return Err(ActrError::Unavailable(
613 "duplicate request result unavailable".to_string(),
614 ));
615 }
616 }
617 };
618
619 match tokio::time::timeout(timeout, wait_for_result).await {
620 Ok(result) => result,
621 Err(_) => Err(ActrError::Unavailable(format!(
622 "duplicate request in-flight timed out after {}ms",
623 timeout.as_millis()
624 ))),
625 }
626 }
627
628 #[cfg_attr(
631 feature = "opentelemetry",
632 tracing::instrument(
633 skip_all,
634 name = "ActrNode.handle_incoming",
635 fields(
636 actr_id = %self.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default(),
637 route_key = %envelope.route_key,
638 request_id = %envelope.request_id,
639 )
640 )
641 )]
642 pub async fn handle_incoming(
643 &self,
644 envelope: RpcEnvelope,
645 caller_id: Option<&ActrId>,
646 ) -> ActorResult<Bytes> {
647 if let Some(caller) = caller_id {
649 tracing::debug!(
650 "📨 Handling incoming message: route_key={}, caller={}, request_id={}",
651 envelope.route_key,
652 caller,
653 envelope.request_id
654 );
655 } else {
656 tracing::debug!(
657 "📨 Handling incoming message: route_key={}, request_id={}",
658 envelope.route_key,
659 envelope.request_id
660 );
661 }
662
663 let actor_id = self.actor_id.as_ref().ok_or_else(|| {
665 ActrError::Internal(
666 "Actor ID not set - node must be started before handling messages".to_string(),
667 )
668 })?;
669
670 let acl_allowed = check_acl_permission(caller_id, actor_id, self.config.acl.as_ref())
672 .map_err(|err_msg| ActrError::Internal(format!("ACL check failed: {}", err_msg)))?;
673
674 if !acl_allowed {
675 tracing::warn!(
676 severity = 5,
677 error_category = "acl_denied",
678 request_id = %envelope.request_id,
679 route_key = %envelope.route_key,
680 caller = %caller_id
681 .map(|c| c.to_string())
682 .unwrap_or_else(|| "<none>".to_string()),
683 "🚫 ACL: Permission denied"
684 );
685
686 return Err(ActrError::PermissionDenied(format!(
687 "ACL denied: {} is not allowed to call {}",
688 caller_id
689 .map(|c| c.to_string())
690 .unwrap_or_else(|| "<unknown>".to_string()),
691 actor_id
692 )));
693 }
694
695 let outcome = {
697 self.dedup_state
698 .lock()
699 .await
700 .check_or_mark(&envelope.request_id)
701 };
702 match outcome {
703 DedupOutcome::Fresh => {} DedupOutcome::InFlight(waiter) => {
705 tracing::debug!(
706 request_id = %envelope.request_id,
707 route_key = %envelope.route_key,
708 "duplicate request in-flight; waiting for original result"
709 );
710 return Self::wait_for_inflight_duplicate(
711 waiter,
712 Self::duplicate_wait_timeout(envelope.timeout_ms),
713 )
714 .await;
715 }
716 DedupOutcome::Duplicate(cached) => {
717 tracing::debug!(
718 request_id = %envelope.request_id,
719 route_key = %envelope.route_key,
720 "♻️ returning cached response for duplicate request_id"
721 );
722 return cached;
723 }
724 }
725
726 let credential_state = self.credential_state.clone().ok_or_else(|| {
728 ActrError::Internal(
729 "Credential not set - node must be started before handling messages".to_string(),
730 )
731 })?;
732 let ctx = self.make_runtime_context(
733 actor_id,
734 caller_id, &envelope.request_id,
736 &credential_state.credential().await,
737 );
738
739 let dispatch_ctx = crate::workload::InvocationContext {
741 self_id: actor_id.clone(),
742 caller_id: caller_id.cloned(),
743 request_id: envelope.request_id.clone(),
744 };
745 let ctx_for_executor = ctx.clone();
746 let workload_for_executor = self.workload_dispatch.clone();
747 let call_executor: crate::workload::HostAbiFn = std::sync::Arc::new(move |pending| {
748 let ctx = ctx_for_executor.clone();
749 let workload_dispatch = workload_for_executor.clone();
750 Box::pin(async move { host_operation_handler(ctx, workload_dispatch, pending).await })
751 });
752
753 let mut guard = self.workload_dispatch.lock().await;
754 let result = guard
755 .dispatch_envelope(envelope.clone(), ctx.clone(), dispatch_ctx, &call_executor)
756 .await
757 .map_err(|e| ActrError::Internal(format!("workload dispatch failed: {e:?}")));
758
759 match &result {
760 Ok(_) => tracing::debug!(
761 request_id = %envelope.request_id,
762 route_key = %envelope.route_key,
763 "✅ Message handled successfully"
764 ),
765 Err(e) => tracing::error!(
766 severity = 6,
767 error_category = "handler_error",
768 request_id = %envelope.request_id,
769 route_key = %envelope.route_key,
770 "❌ Message handling failed: {:?}", e
771 ),
772 }
773
774 self.dedup_state
776 .lock()
777 .await
778 .complete(&envelope.request_id, result.clone());
779
780 result
781 }
782
783 pub(crate) async fn build(
788 config: actr_config::RuntimeConfig,
789 workload: crate::workload::Workload,
790 package_manifest: Option<actr_pack::PackageManifest>,
791 packaged_lock: Option<actr_config::lock::LockFile>,
792 mailbox_backpressure_threshold: usize,
793 credential_expiry_warning: Duration,
794 ) -> ActorResult<Self> {
795 use crate::outbound::{Gate, HostGate};
796 use crate::wire::webrtc::{ReconnectConfig, SignalingConfig, WebSocketSignalingClient};
797
798 tracing::info!("🚀 Initializing ActrNode");
799
800 let mailbox_path = config
802 .mailbox_path
803 .as_ref()
804 .map(|p| p.to_string_lossy().to_string())
805 .unwrap_or_else(|| ":memory:".to_string());
806
807 tracing::info!("📂 Mailbox database path: {}", mailbox_path);
808
809 let mailbox: Arc<dyn actr_runtime_mailbox::Mailbox> = Arc::new(
810 actr_runtime_mailbox::SqliteMailbox::new(&mailbox_path)
811 .await
812 .map_err(|e| {
813 actr_protocol::ActrError::Unavailable(format!("Mailbox init failed: {e}"))
814 })?,
815 );
816
817 let dlq_path = if mailbox_path == ":memory:" {
819 ":memory:".to_string()
820 } else {
821 format!("{mailbox_path}.dlq")
822 };
823
824 let dlq: Arc<dyn actr_runtime_mailbox::DeadLetterQueue> = Arc::new(
825 actr_runtime_mailbox::SqliteDeadLetterQueue::new_standalone(&dlq_path)
826 .await
827 .map_err(|e| {
828 actr_protocol::ActrError::Unavailable(format!("DLQ init failed: {e}"))
829 })?,
830 );
831 tracing::info!("✅ Dead Letter Queue initialized");
832
833 let webrtc_role = if config.webrtc.advanced.prefer_answerer() {
835 Some("answer".to_string())
836 } else {
837 None
838 };
839
840 let signaling_config = SignalingConfig {
841 server_url: config.signaling_url.clone(),
842 connection_timeout: 30,
843 heartbeat_interval: 30,
844 reconnect_config: ReconnectConfig::default(),
845 auth_config: None,
846 webrtc_role,
847 };
848
849 let client = Arc::new(WebSocketSignalingClient::new(signaling_config));
850 client.start_reconnect_manager();
851 let signaling_client: Arc<dyn crate::wire::webrtc::SignalingClient> = client;
852
853 let shell_to_workload = Arc::new(HostTransport::new());
855 let workload_to_shell = Arc::new(HostTransport::new());
856 let inproc_gate = Gate::Host(Arc::new(HostGate::new(shell_to_workload.clone())));
857
858 let data_stream_registry = Arc::new(DataStreamRegistry::new());
859 let media_frame_registry = Arc::new(MediaFrameRegistry::new());
860
861 tracing::info!("✅ Inproc infrastructure initialized (bidirectional Shell ↔ Guest)");
862
863 let actr_lock = if let Some(lock) = packaged_lock {
864 tracing::info!(
865 "📋 Loaded packaged manifest.lock.toml with {} dependencies",
866 lock.dependencies.len()
867 );
868 Some(Arc::new(lock))
869 } else {
870 tracing::warn!(
871 "⚠️ manifest.lock.toml not found in package. Continuing without dependency fingerprints."
872 );
873 None
874 };
875
876 tracing::info!("✅ ActrNode initialized");
877
878 Ok(Self {
879 config,
880 mailbox,
881 dlq,
882 inproc_gate,
883 outproc_gate: None, data_stream_registry,
885 media_frame_registry,
886 signaling_client,
887 actor_id: None,
888 credential_state: None,
889 webrtc_coordinator: None,
890 webrtc_gate: None,
891 websocket_gate: None,
892 shell_to_workload: Some(shell_to_workload),
893 workload_to_shell: Some(workload_to_shell),
894 shutdown_token: CancellationToken::new(),
895 actr_lock,
896 network_event_rx: None,
897 network_event_result_tx: None,
898 network_event_debounce_config: None,
899 dedup_state: Arc::new(Mutex::new(DedupState::new())),
900 package_manifest,
901 preregistered_credential: None,
902 discovered_ws_addresses: Arc::new(tokio::sync::RwLock::new(
903 std::collections::HashMap::new(),
904 )),
905 workload_dispatch: Arc::new(Mutex::new(workload)),
906 hook_observer: None,
907 mailbox_backpressure_threshold,
908 credential_expiry_warning,
909 })
910 }
911
912 pub(crate) fn bootstrap_ctx_builder(&self) -> BootstrapContextBuilder {
920 BootstrapContextBuilder::new(
921 self.inproc_gate.clone(),
922 self.outproc_gate.clone(),
923 self.data_stream_registry.clone(),
924 self.media_frame_registry.clone(),
925 self.signaling_client.clone(),
926 self.actr_lock.clone(),
927 )
928 }
929
930 pub(crate) fn make_runtime_context(
935 &self,
936 self_id: &ActrId,
937 caller_id: Option<&ActrId>,
938 request_id: &str,
939 credential: &AIdCredential,
940 ) -> RuntimeContext {
941 RuntimeContext::new(
942 self_id.clone(),
943 caller_id.cloned(),
944 request_id.to_string(),
945 self.inproc_gate.clone(),
946 self.outproc_gate.clone(),
947 self.data_stream_registry.clone(),
948 self.media_frame_registry.clone(),
949 self.signaling_client.clone(),
950 credential.clone(),
951 self.actr_lock.clone(),
952 )
953 }
954
955 pub fn create_network_event_handle(
963 &mut self,
964 debounce_ms: u64,
965 ) -> crate::lifecycle::NetworkEventHandle {
966 if self.network_event_rx.is_some() {
967 panic!("create_network_event_handle() can only be called once");
968 }
969
970 let (event_tx, event_rx) = tokio::sync::mpsc::channel(100);
971 let (result_tx, result_rx) = tokio::sync::mpsc::channel(100);
972
973 let debounce_config = if debounce_ms > 0 {
974 Some(crate::lifecycle::network_event::DebounceConfig {
975 window: std::time::Duration::from_millis(debounce_ms),
976 })
977 } else {
978 None
979 };
980
981 self.network_event_rx = Some(event_rx);
982 self.network_event_result_tx = Some(result_tx);
983 self.network_event_debounce_config = debounce_config;
984
985 crate::lifecycle::NetworkEventHandle::new(event_tx, result_rx)
986 }
987
988 pub fn set_preregistered_credential(&mut self, register_ok: register_response::RegisterOk) {
993 tracing::debug!("Pre-registered credential attached; start() will skip AIS registration");
994 self.preregistered_credential = Some(register_ok);
995 }
996
997 pub async fn start(mut self) -> ActorResult<ActrRef> {
999 tracing::info!("🚀 Starting ActrNode");
1000 tracing::info!("Actr Rust version: {}", env!("CARGO_PKG_VERSION"));
1001
1002 let actr_type = self.config.actr_type().clone();
1007 tracing::info!("📋 Actor type: {}", actr_type);
1008
1009 let service_spec = None;
1015
1016 let ws_address = if let Some(port) = self.config.websocket_listen_port {
1019 let host = self
1020 .config
1021 .websocket_advertised_host
1022 .as_deref()
1023 .unwrap_or("127.0.0.1");
1024 Some(format!("ws://{}:{}", host, port))
1025 } else {
1026 None
1027 };
1028
1029 if let Some(ref addr) = ws_address {
1030 tracing::info!(
1031 "📡 Advertising WebSocket address to signaling server: {}",
1032 addr
1033 );
1034 }
1035
1036 let register_request = RegisterRequest {
1037 actr_type: actr_type.clone(),
1038 realm: self.config.realm,
1039 service_spec,
1040 acl: self.config.acl.clone(),
1041 service: None,
1042 ws_address,
1043 auth_mode: Some(RegisterAuthMode::Linked as i32),
1044 ..Default::default()
1045 };
1046
1047 let register_ok = if let Some(injected) = self.preregistered_credential.take() {
1051 tracing::info!(
1052 "Using Hyper pre-injected registration credential; skipping AIS registration"
1053 );
1054 injected
1055 } else {
1056 let ais_endpoint = &self.config.ais_endpoint;
1057 tracing::info!(
1058 ais_endpoint = %ais_endpoint,
1059 "Registering actor with AIS via HTTP"
1060 );
1061 let mut ais = AisClient::new(ais_endpoint);
1062 if let Some(ref secret) = self.config.realm_secret {
1063 ais = ais.with_realm_secret(secret);
1064 }
1065 let resp = ais
1066 .register_linked(register_request.clone())
1067 .await
1068 .map_err(|e| ActrError::Unavailable(format!("AIS registration failed: {e}")))?;
1069 match resp.result {
1070 Some(register_response::Result::Success(ok)) => {
1071 tracing::info!("✅ AIS HTTP registration successful");
1072 ok
1073 }
1074 Some(register_response::Result::Error(error)) => {
1075 tracing::error!(
1076 severity = 10,
1077 error_category = "registration_error",
1078 error_code = error.code,
1079 "❌ AIS registration failed: code={}, message={}",
1080 error.code,
1081 error.message
1082 );
1083 return Err(ActrError::Unavailable(format!(
1084 "AIS registration rejected: {} (code: {})",
1085 error.message, error.code
1086 )));
1087 }
1088 None => {
1089 tracing::error!(
1090 severity = 10,
1091 error_category = "registration_error",
1092 "❌ AIS registration response missing result"
1093 );
1094 return Err(ActrError::Unavailable(
1095 "Invalid AIS registration response: missing result".to_string(),
1096 ));
1097 }
1098 }
1099 };
1100
1101 let pre_connect_credential_state = {
1108 let actor_id = register_ok.actr_id.clone();
1109 let credential_state = CredentialState::new(
1110 register_ok.credential.clone(),
1111 register_ok.credential_expires_at,
1112 Some(register_ok.turn_credential.clone()),
1113 );
1114 self.signaling_client.set_actor_id(actor_id).await;
1115 self.signaling_client
1116 .set_credential_state(credential_state.clone())
1117 .await;
1118 credential_state
1119 };
1120
1121 {
1127 let actor_id = register_ok.actr_id.clone();
1128 let credential_state = pre_connect_credential_state.clone();
1129 let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1133 let ctx_builder: crate::lifecycle::hooks::HookContextBuilder = Arc::new(move || {
1134 let snapshot = ctx_builder_snapshot.clone();
1135 let actor_id = actor_id.clone();
1136 let credential_state = credential_state.clone();
1137 Box::pin(async move {
1138 Some(snapshot.build_bootstrap(&actor_id, &credential_state.credential().await))
1139 })
1140 });
1141 let cb = crate::lifecycle::hooks::build_hook_callback(
1142 self.hook_observer.clone(),
1143 ctx_builder,
1144 );
1145 self.signaling_client.set_hook_callback(cb);
1146 }
1147
1148 tracing::info!("📡 Connecting to signaling server (with credential)");
1149 self.signaling_client
1150 .connect()
1151 .await
1152 .map_err(|e| ActrError::Unavailable(format!("Signaling connect failed: {e}")))?;
1153 tracing::info!("✅ Connected to signaling server");
1154
1155 let mut task_handles = Vec::new();
1157
1158 let node_hook_callback: Option<crate::wire::webrtc::HookCallback>;
1162
1163 {
1164 let actor_id = register_ok.actr_id;
1165 let credential = register_ok.credential;
1166
1167 tracing::info!("🆔 Assigned ActrId: {}", actor_id);
1168 tracing::info!("🔐 Received credential (key_id: {})", credential.key_id);
1169 tracing::info!(
1170 "💓 Signaling heartbeat interval: {} seconds",
1171 register_ok.signaling_heartbeat_interval_secs
1172 );
1173
1174 tracing::debug!("TurnCredential received, TURN authentication ready");
1176
1177 if let Some(expires_at) = ®ister_ok.credential_expires_at {
1178 tracing::debug!("⏰ Credential expires at: {}s", expires_at.seconds);
1179 }
1180
1181 self.actor_id = Some(actor_id.clone());
1183 let credential_state = CredentialState::new(
1184 credential,
1185 register_ok.credential_expires_at,
1186 Some(register_ok.turn_credential.clone()),
1187 );
1188 self.credential_state = Some(credential_state.clone());
1189
1190 node_hook_callback =
1201 {
1202 let actor_id_for_hook = actor_id.clone();
1203 let credential_state_for_hook = credential_state.clone();
1204 let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1209 let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1210 Arc::new(move || {
1211 let snapshot = ctx_builder_snapshot.clone();
1212 let actor_id = actor_id_for_hook.clone();
1213 let credential_state = credential_state_for_hook.clone();
1214 Box::pin(async move {
1215 Some(snapshot.build_bootstrap(
1216 &actor_id,
1217 &credential_state.credential().await,
1218 ))
1219 })
1220 });
1221 Some(crate::lifecycle::hooks::build_hook_callback(
1222 self.hook_observer.clone(),
1223 ctx_builder,
1224 ))
1225 };
1226
1227 if let Some(expires_at) = ®ister_ok.credential_expires_at {
1232 let new_expiry = std::time::UNIX_EPOCH
1233 + std::time::Duration::from_secs(expires_at.seconds.max(0) as u64);
1234 if let Some(cb) = node_hook_callback.as_ref() {
1235 cb(crate::wire::webrtc::HookEvent::CredentialRenewed { new_expiry }).await;
1236 } else {
1237 tracing::info!(new_expiry = ?new_expiry, "credential renewed");
1238 }
1239 }
1240
1241 tracing::info!("✅ Inproc infrastructure already ready (created in ActrNode::build())");
1249
1250 tracing::info!("🌐 Initializing WebRTC infrastructure");
1254
1255 let media_frame_registry = self.media_frame_registry.clone();
1256
1257 let coordinator = Arc::new(crate::wire::webrtc::WebRtcCoordinator::new(
1259 actor_id.clone(),
1260 credential_state.clone(),
1261 self.signaling_client.clone(),
1262 self.config.webrtc.clone(),
1263 media_frame_registry,
1264 ));
1265
1266 {
1270 let actor_id_for_hook = actor_id.clone();
1271 let credential_state_for_hook = credential_state.clone();
1272 let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1276 let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1277 Arc::new(move || {
1278 let snapshot = ctx_builder_snapshot.clone();
1279 let actor_id = actor_id_for_hook.clone();
1280 let credential_state = credential_state_for_hook.clone();
1281 Box::pin(async move {
1282 Some(
1283 snapshot.build_bootstrap(
1284 &actor_id,
1285 &credential_state.credential().await,
1286 ),
1287 )
1288 })
1289 });
1290 let cb = crate::lifecycle::hooks::build_hook_callback(
1291 self.hook_observer.clone(),
1292 ctx_builder,
1293 );
1294 coordinator.set_hook_callback(cb);
1295 }
1296
1297 tracing::info!("🏗️ Creating PeerTransport with WebRTC support");
1301
1302 use crate::transport::{DefaultWireBuilder, DefaultWireBuilderConfig};
1304
1305 let local_id_hex = hex::encode(actor_id.encode_to_vec());
1308 let wire_builder_config = DefaultWireBuilderConfig {
1309 local_id_hex,
1310 enable_webrtc: true,
1311 enable_websocket: true,
1312 discovered_ws_addresses: self.discovered_ws_addresses.clone(),
1315 credential_state: Some(credential_state.clone()),
1318 };
1319 let wire_builder = Arc::new(DefaultWireBuilder::new(
1320 Some(coordinator.clone()),
1321 wire_builder_config,
1322 ));
1323
1324 use crate::transport::PeerTransport;
1326 let transport_manager = Arc::new(PeerTransport::new(actor_id.clone(), wire_builder));
1327
1328 use crate::outbound::PeerGate;
1330 let outproc_gate =
1331 Arc::new(PeerGate::new(transport_manager, Some(coordinator.clone())));
1332 let outproc_gate_enum = Gate::Peer(outproc_gate.clone());
1333 tracing::info!("PeerTransport + PeerGate initialized");
1334
1335 let data_stream_registry = self.data_stream_registry.clone();
1336
1337 let pending_requests = outproc_gate.get_pending_requests();
1339 let gate = Arc::new(crate::wire::webrtc::gate::WebRtcGate::new(
1340 coordinator.clone(),
1341 pending_requests,
1342 data_stream_registry.clone(),
1343 ));
1344 gate.set_local_id(actor_id.clone()).await;
1346 tracing::info!(
1347 "✅ WebRtcGate created with shared pending_requests and DataStreamRegistry"
1348 );
1349
1350 tracing::info!("🔧 Wiring outproc_gate into node");
1357 self.outproc_gate = Some(outproc_gate_enum);
1358 tracing::info!("✅ Node runtime gates fully initialized (inproc + outproc)");
1359
1360 self.webrtc_coordinator = Some(coordinator.clone());
1362 self.webrtc_gate = Some(gate.clone());
1363 tracing::info!("✅ WebRTC infrastructure initialized");
1364
1365 {
1369 let startup_ctx = self
1370 .bootstrap_ctx_builder()
1371 .build_bootstrap(&actor_id, &credential_state.credential().await);
1372 let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_start");
1373 let call_executor =
1374 lifecycle_host_abi(startup_ctx.clone(), self.workload_dispatch.clone());
1375 let mut workload = self.workload_dispatch.lock().await;
1376 crate::lifecycle::hooks::call_lifecycle_hook(
1377 "on_start",
1378 workload.on_start(startup_ctx, invocation, &call_executor),
1379 )
1380 .await?;
1381 }
1382
1383 if let Some(listen_port) = self.config.websocket_listen_port {
1387 tracing::info!(
1388 "🔌 WebSocket direct-connect mode enabled, binding port {}",
1389 listen_port
1390 );
1391 use crate::key_cache::AisKeyCache;
1392 use crate::wire::websocket::gate::WsAuthContext;
1393 use crate::wire::websocket::{WebSocketGate, WebSocketServer};
1394
1395 let ais_key_cache = AisKeyCache::new();
1397 if !register_ok.signing_pubkey.is_empty() {
1398 match ais_key_cache
1399 .seed(register_ok.signing_key_id, ®ister_ok.signing_pubkey)
1400 .await
1401 {
1402 Ok(()) => tracing::info!(
1403 key_id = register_ok.signing_key_id,
1404 "🔑 AisKeyCache seeded from RegisterOk"
1405 ),
1406 Err(e) => tracing::warn!(
1407 key_id = register_ok.signing_key_id,
1408 error = ?e,
1409 "AisKeyCache seed failed; WebSocket will reject all inbound connections"
1410 ),
1411 }
1412 } else {
1413 tracing::warn!(
1414 "RegisterOk missing signing_pubkey; WebSocket credential verification will degrade"
1415 );
1416 }
1417
1418 let auth_ctx = WsAuthContext {
1419 ais_key_cache,
1420 actor_id: actor_id.clone(),
1421 credential_state: credential_state.clone(),
1422 signaling_client: self.signaling_client.clone(),
1423 };
1424
1425 match WebSocketServer::bind(listen_port).await {
1426 Ok((ws_server, conn_rx)) => {
1427 ws_server.start(self.shutdown_token.clone());
1428 let ws_gate = Arc::new(WebSocketGate::new(
1429 conn_rx,
1430 outproc_gate.get_pending_requests(),
1431 data_stream_registry.clone(),
1432 Some(auth_ctx),
1433 ));
1434
1435 {
1437 let actor_id_for_hook = actor_id.clone();
1438 let credential_state_for_hook = credential_state.clone();
1439 let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1443 let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1444 Arc::new(move || {
1445 let snapshot = ctx_builder_snapshot.clone();
1446 let actor_id = actor_id_for_hook.clone();
1447 let credential_state = credential_state_for_hook.clone();
1448 Box::pin(async move {
1449 Some(snapshot.build_bootstrap(
1450 &actor_id,
1451 &credential_state.credential().await,
1452 ))
1453 })
1454 });
1455 let cb = crate::lifecycle::hooks::build_hook_callback(
1456 self.hook_observer.clone(),
1457 ctx_builder,
1458 );
1459 ws_gate.set_hook_callback(cb);
1460 }
1461
1462 self.websocket_gate = Some(ws_gate);
1463 tracing::info!(
1464 "✅ WebSocketServer + WebSocketGate initialized (credential auth enabled)"
1465 );
1466 }
1467 Err(e) => {
1468 tracing::error!(
1469 "❌ Failed to bind WebSocket server on port {}: {:?}",
1470 listen_port,
1471 e
1472 );
1473 }
1474 }
1475 }
1476
1477 {
1486 let shutdown = self.shutdown_token.clone();
1487 let client = self.signaling_client.clone();
1488 let actor_id_for_heartbeat = actor_id.clone();
1489 let credential_state_for_heartbeat = credential_state.clone();
1490 let mailbox_for_heartbeat = self.mailbox.clone();
1491 let register_request_for_heartbeat = register_request.clone();
1492
1493 let heartbeat_interval_secs = register_ok.signaling_heartbeat_interval_secs;
1495 let heartbeat_interval = if heartbeat_interval_secs > 0 {
1496 Duration::from_secs(heartbeat_interval_secs as u64)
1497 } else {
1498 Duration::from_secs(30)
1499 };
1500 let ais_endpoint_for_heartbeat = self.config.ais_endpoint.clone();
1501 let heartbeat_handle = tokio::spawn(crate::lifecycle::heartbeat::heartbeat_task(
1502 shutdown,
1503 client,
1504 actor_id_for_heartbeat,
1505 credential_state_for_heartbeat,
1506 mailbox_for_heartbeat,
1507 heartbeat_interval,
1508 register_request_for_heartbeat,
1509 ais_endpoint_for_heartbeat,
1510 node_hook_callback.clone(),
1511 ));
1512 task_handles.push(heartbeat_handle);
1513 }
1514 tracing::info!(
1515 "✅ Heartbeat task started (interval: {}s)",
1516 register_ok.signaling_heartbeat_interval_secs
1517 );
1518
1519 if let (Some(event_rx), Some(result_tx)) = (
1523 self.network_event_rx.take(),
1524 self.network_event_result_tx.take(),
1525 ) {
1526 use crate::lifecycle::network_event::DefaultNetworkEventProcessor;
1527
1528 let event_processor =
1531 if let Some(config) = self.network_event_debounce_config.clone() {
1532 Arc::new(DefaultNetworkEventProcessor::new_with_debounce(
1533 self.signaling_client.clone(),
1534 self.webrtc_coordinator.clone(),
1535 config,
1536 ))
1537 } else {
1538 Arc::new(DefaultNetworkEventProcessor::new(
1539 self.signaling_client.clone(),
1540 self.webrtc_coordinator.clone(),
1541 ))
1542 };
1543
1544 let shutdown = self.shutdown_token.clone();
1545 let network_event_handle = tokio::spawn(async move {
1546 Self::network_event_loop(event_rx, result_tx, event_processor, shutdown).await;
1547 });
1548 task_handles.push(network_event_handle);
1549 tracing::info!("✅ Network event loop started");
1550 }
1551
1552 {
1553 let shutdown = self.shutdown_token.clone();
1564 let client = self.signaling_client.clone();
1565 let actor_id_for_unreg = actor_id.clone();
1566 let credential_state_for_unreg = credential_state.clone();
1567 let webrtc_coordinator = self.webrtc_coordinator.clone();
1568
1569 let unregister_handle = tokio::spawn(async move {
1570 shutdown.cancelled().await;
1572 tracing::info!(
1573 "📡 Shutdown signal received, sending UnregisterRequest for Actor {}",
1574 actor_id_for_unreg
1575 );
1576
1577 if let Some(coord) = webrtc_coordinator {
1579 if let Err(e) = coord.close_all_peers().await {
1580 tracing::warn!(
1581 "⚠️ Failed to close all WebRTC peers before UnregisterRequest: {}",
1582 e
1583 );
1584 } else {
1585 tracing::info!("✅ All WebRTC peers closed before UnregisterRequest");
1586 }
1587 } else {
1588 tracing::debug!(
1589 "WebRTC coordinator not found before UnregisterRequest (no WebRTC?)"
1590 );
1591 }
1592
1593 let result = tokio::time::timeout(
1595 Duration::from_secs(5),
1596 client.send_unregister_request(
1597 actor_id_for_unreg.clone(),
1598 credential_state_for_unreg.credential().await,
1599 Some("Graceful shutdown".to_string()),
1600 ),
1601 )
1602 .await;
1603 tracing::info!("UnregisterRequest result: {:?}", result);
1604 match result {
1605 Ok(Ok(_)) => {
1606 tracing::info!(
1607 "✅ UnregisterRequest sent to signaling server for Actor {}",
1608 actor_id_for_unreg
1609 );
1610 }
1611 Ok(Err(e)) => {
1612 tracing::warn!(
1613 "⚠️ Failed to send UnregisterRequest for Actor {}: {}",
1614 actor_id_for_unreg,
1615 e
1616 );
1617 }
1618 Err(_) => {
1619 tracing::warn!(
1620 "⚠️ UnregisterRequest timeout (5s) for Actor {}",
1621 actor_id_for_unreg
1622 );
1623 }
1624 }
1625 });
1626
1627 task_handles.push(unregister_handle);
1628 }
1629 } tracing::info!("✅ Transport layer initialized via WebRTC infrastructure");
1635
1636 let actor_id = self
1641 .actor_id
1642 .as_ref()
1643 .ok_or_else(|| ActrError::Internal("Actor ID not set".to_string()))?
1644 .clone();
1645 let bootstrap_ctx_builder = self.bootstrap_ctx_builder();
1649 let credential_state = self
1650 .credential_state
1651 .clone()
1652 .expect("CredentialState must be initialized in start()");
1653 let shutdown_token = self.shutdown_token.clone();
1654 let node_ref = Arc::new(self);
1655
1656 {
1660 let node = node_ref.clone();
1661 let actor_id = actor_id.clone();
1662 let credential_state = credential_state.clone();
1663 let shutdown = shutdown_token.clone();
1664 let on_stop_handle = tokio::spawn(async move {
1665 shutdown.cancelled().await;
1666 let stop_ctx = node
1667 .bootstrap_ctx_builder()
1668 .build_bootstrap(&actor_id, &credential_state.credential().await);
1669 let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_stop");
1670 let call_executor =
1671 lifecycle_host_abi(stop_ctx.clone(), node.workload_dispatch.clone());
1672 let mut workload = node.workload_dispatch.lock().await;
1673 if let Err(e) = crate::lifecycle::hooks::call_lifecycle_hook(
1674 "on_stop",
1675 workload.on_stop(stop_ctx, invocation, &call_executor),
1676 )
1677 .await
1678 {
1679 tracing::warn!(error = %e, "workload on_stop returned Err");
1680 }
1681 });
1682 task_handles.push(on_stop_handle);
1683 }
1684
1685 tracing::info!("🚀 Starting WebRTC background loops");
1689
1690 if let Some(coordinator) = &node_ref.webrtc_coordinator {
1692 coordinator.clone().start().await.map_err(|e| {
1693 ActrError::Unavailable(format!("WebRtcCoordinator start failed: {e}"))
1694 })?;
1695 tracing::info!("✅ WebRtcCoordinator signaling loop started");
1696 }
1697
1698 if let Some(gate) = &node_ref.webrtc_gate {
1700 gate.start_receive_loop(node_ref.mailbox.clone())
1701 .await
1702 .map_err(|e| {
1703 ActrError::Unavailable(format!("WebRtcGate receive loop start failed: {e}"))
1704 })?;
1705 tracing::info!("✅ WebRtcGate → Mailbox routing started");
1706 }
1707
1708 if let Some(ws_gate) = &node_ref.websocket_gate {
1710 ws_gate
1711 .start_receive_loop(node_ref.mailbox.clone())
1712 .await
1713 .map_err(|e| {
1714 ActrError::Unavailable(format!("WebSocketGate receive loop start failed: {e}"))
1715 })?;
1716 tracing::info!("✅ WebSocketGate → Mailbox routing started");
1717 }
1718 tracing::info!("✅ WebRTC background loops started");
1719
1720 if let Some(shell_to_workload) = &node_ref.shell_to_workload {
1724 tracing::info!("🔄 Starting Inproc receive loop (Shell → Guest)");
1725 if let Some(workload_to_shell) = &node_ref.workload_to_shell {
1727 let node = node_ref.clone();
1728 let request_rx_lane = shell_to_workload
1729 .get_lane(PayloadType::RpcReliable, None)
1730 .await
1731 .map_err(|e| {
1732 ActrError::Unavailable(format!("Failed to get guest receive lane: {e}"))
1733 })?;
1734 let response_tx = workload_to_shell.clone();
1735 let shutdown = shutdown_token.clone();
1736
1737 let inproc_handle = tokio::spawn(async move {
1738 loop {
1739 tokio::select! {
1740 _ = shutdown.cancelled() => {
1741 tracing::info!("📭 Guest receive loop (Shell → Guest) received shutdown signal");
1742 break;
1743 }
1744 envelope_result = request_rx_lane.recv_envelope() => {
1745 match envelope_result {
1746 Ok(envelope) => {
1747 let request_id = envelope.request_id.clone();
1748 tracing::debug!("📨 Guest received REQUEST from Shell: request_id={}", request_id);
1749 #[cfg(feature = "opentelemetry")]
1751 let span = {
1752 let actr_id_str = node.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default();
1753 let span = tracing::info_span!("ActrNode.lane_receive", actr_id = %actr_id_str, request_id = %request_id);
1754 set_parent_from_rpc_envelope(&span, &envelope);
1755 span
1756 };
1757
1758 let handle_incoming_fut = node.handle_incoming(envelope.clone(), None);
1760 #[cfg(feature = "opentelemetry")]
1761 let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
1762
1763 match handle_incoming_fut.await {
1764 Ok(response_bytes) => {
1765 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1768 let mut response_envelope = RpcEnvelope {
1769 route_key: envelope.route_key.clone(),
1770 payload: Some(response_bytes),
1771 error: None,
1772 traceparent: None,
1773 tracestate: None,
1774 request_id: request_id.clone(),
1775 metadata: Vec::new(),
1776 timeout_ms: 30000,
1777 };
1778 #[cfg(feature = "opentelemetry")]
1780 inject_span_context_to_rpc(&span, &mut response_envelope);
1781
1782 let send_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, response_envelope);
1784 #[cfg(feature = "opentelemetry")]
1785 let send_response_fut = send_response_fut.instrument(span.clone());
1786 if let Err(e) = send_response_fut.await {
1787 tracing::error!(
1788 severity = 7,
1789 error_category = "transport_error",
1790 request_id = %request_id,
1791 "❌ Failed to send RESPONSE to Shell: {:?}",
1792 e
1793 );
1794 }
1795 }
1796 Err(e) => {
1797 tracing::error!(
1798 severity = 6,
1799 error_category = "handler_error",
1800 request_id = %request_id,
1801 route_key = %envelope.route_key,
1802 "❌ Guest message handling failed: {:?}",
1803 e
1804 );
1805
1806 let error_response = actr_protocol::ErrorResponse {
1808 code: protocol_error_to_code(&e),
1809 message: e.to_string(),
1810 };
1811 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1812 let mut error_envelope = RpcEnvelope {
1813 route_key: envelope.route_key.clone(),
1814 payload: None,
1815 error: Some(error_response),
1816 traceparent: envelope.traceparent.clone(),
1817 tracestate: envelope.tracestate.clone(),
1818 request_id: request_id.clone(),
1819 metadata: Vec::new(),
1820 timeout_ms: 30000,
1821 };
1822 #[cfg(feature = "opentelemetry")]
1824 inject_span_context_to_rpc(&span, &mut error_envelope);
1825
1826 let send_error_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, error_envelope);
1827 #[cfg(feature = "opentelemetry")]
1828 let send_error_response_fut = send_error_response_fut.instrument(span);
1829 if let Err(send_err) = send_error_response_fut.await {
1830 tracing::error!(
1831 severity = 7,
1832 error_category = "transport_error",
1833 request_id = %request_id,
1834 "❌ Failed to send ERROR response to Shell: {:?}",
1835 send_err
1836 );
1837 }
1838 }
1839 }
1840 }
1841 Err(e) => {
1842 tracing::error!(
1843 severity = 8,
1844 error_category = "transport_error",
1845 "❌ Failed to receive from Shell → Guest lane: {:?}",
1846 e
1847 );
1848 break;
1849 }
1850 }
1851 }
1852 }
1853 }
1854 tracing::info!("✅ Guest receive loop (Shell → Guest) terminated gracefully");
1855 });
1856 task_handles.push(inproc_handle);
1857 }
1858 }
1859 tracing::info!("✅ Guest receive loop (Shell → Guest REQUEST) started");
1860
1861 tracing::info!("🔄 Starting Shell receive loop (Guest → Shell RESPONSE)");
1865 if let Some(workload_to_shell) = &node_ref.workload_to_shell {
1866 if let Some(shell_to_workload) = &node_ref.shell_to_workload {
1868 let response_rx_lane = workload_to_shell
1869 .get_lane(PayloadType::RpcReliable, None)
1870 .await
1871 .map_err(|e| {
1872 ActrError::Unavailable(format!("Failed to get shell receive lane: {e}"))
1873 })?;
1874 let request_mgr = shell_to_workload.clone();
1875 let shutdown = shutdown_token.clone();
1876
1877 let shell_receive_handle = tokio::spawn(async move {
1878 loop {
1879 tokio::select! {
1880 _ = shutdown.cancelled() => {
1881 tracing::info!("📭 Shell receive loop (Guest → Shell) received shutdown signal");
1882 break;
1883 }
1884 envelope_result = response_rx_lane.recv_envelope() => {
1885 match envelope_result {
1886 Ok(envelope) => {
1887 tracing::debug!(
1888 "📨 Shell received RESPONSE from Guest: request_id={}",
1889 envelope.request_id
1890 );
1891
1892 match (envelope.payload, envelope.error) {
1894 (Some(payload), None) => {
1895 if let Err(e) = request_mgr
1897 .complete_response(&envelope.request_id, payload)
1898 .await
1899 {
1900 tracing::warn!(
1901 severity = 4,
1902 error_category = "orphan_response",
1903 request_id = %envelope.request_id,
1904 "⚠️ No pending request found for response: {:?}",
1905 e
1906 );
1907 }
1908 }
1909 (None, Some(error)) => {
1910 let actr_err = ActrError::Unavailable(format!("RPC error {}: {}", error.code, error.message));
1912 if let Err(e) = request_mgr
1913 .complete_error(&envelope.request_id, actr_err)
1914 .await
1915 {
1916 tracing::warn!(
1917 severity = 4,
1918 error_category = "orphan_response",
1919 request_id = %envelope.request_id,
1920 "⚠️ No pending request found for error response: {:?}",
1921 e
1922 );
1923 }
1924 }
1925 _ => {
1926 tracing::error!(
1927 severity = 7,
1928 error_category = "protocol_error",
1929 request_id = %envelope.request_id,
1930 "❌ Invalid RpcEnvelope: both payload and error are present or both absent"
1931 );
1932 }
1933 }
1934 }
1935 Err(e) => {
1936 tracing::error!(
1937 severity = 8,
1938 error_category = "transport_error",
1939 "❌ Failed to receive from Guest → Shell lane: {:?}",
1940 e
1941 );
1942 break;
1943 }
1944 }
1945 }
1946 }
1947 }
1948 tracing::info!("✅ Shell receive loop (Guest → Shell) terminated gracefully");
1949 });
1950 task_handles.push(shell_receive_handle);
1951 }
1952 }
1953 tracing::info!("✅ Shell receive loop (Guest → Shell RESPONSE) started");
1954
1955 let backpressure_threshold = node_ref.mailbox_backpressure_threshold;
1970 {
1971 use std::sync::atomic::{AtomicBool, Ordering};
1972 let mailbox = node_ref.mailbox.clone();
1973 let shutdown = shutdown_token.clone();
1974 let hook_cb = node_hook_callback.clone();
1975 let triggered = Arc::new(AtomicBool::new(false));
1976
1977 let fire_if_rising = {
1980 let triggered = triggered.clone();
1981 let hook_cb = hook_cb.clone();
1982 Arc::new(move |queue_len: usize| {
1983 if queue_len >= backpressure_threshold {
1984 if !triggered.swap(true, Ordering::AcqRel) {
1985 if let Some(cb) = hook_cb.as_ref() {
1986 let cb = cb.clone();
1987 tokio::spawn(async move {
1988 cb(crate::wire::webrtc::HookEvent::MailboxBackpressure {
1989 queue_len,
1990 threshold: backpressure_threshold,
1991 })
1992 .await;
1993 });
1994 } else {
1995 tracing::warn!(
1996 queue_len,
1997 threshold = backpressure_threshold,
1998 "mailbox backpressure",
1999 );
2000 }
2001 }
2002 } else if triggered.swap(false, Ordering::AcqRel) {
2003 tracing::info!(
2004 queue_len,
2005 threshold = backpressure_threshold,
2006 "mailbox backpressure cleared",
2007 );
2008 }
2009 })
2010 };
2011
2012 struct EnqueueObserver {
2016 fire: Arc<dyn Fn(usize) + Send + Sync + 'static>,
2017 }
2018 impl actr_runtime_mailbox::MailboxDepthObserver for EnqueueObserver {
2019 fn on_depth_change(&self, queued_messages: usize) {
2020 (self.fire)(queued_messages);
2021 }
2022 }
2023
2024 let installed = {
2025 let observer: Arc<dyn actr_runtime_mailbox::MailboxDepthObserver> =
2026 Arc::new(EnqueueObserver {
2027 fire: fire_if_rising.clone(),
2028 });
2029 mailbox.set_depth_observer(observer)
2030 };
2031
2032 if installed {
2033 tracing::debug!("mailbox backpressure watchdog: push notifications enabled");
2034 } else {
2035 tracing::debug!(
2036 "mailbox backpressure watchdog: backend does not support push, falling back to 1 Hz polling"
2037 );
2038 let mailbox_for_poll = mailbox.clone();
2039 let shutdown_for_poll = shutdown.clone();
2040 let fire_for_poll = fire_if_rising.clone();
2041 let watchdog_handle = tokio::spawn(async move {
2042 let mut ticker = tokio::time::interval(Duration::from_secs(1));
2043 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
2044 loop {
2045 tokio::select! {
2046 _ = shutdown_for_poll.cancelled() => {
2047 tracing::debug!(
2048 "mailbox backpressure watchdog shutting down"
2049 );
2050 break;
2051 }
2052 _ = ticker.tick() => {
2053 let status = match mailbox_for_poll.status().await {
2054 Ok(s) => s,
2055 Err(e) => {
2056 tracing::debug!(?e, "mailbox status poll failed");
2057 continue;
2058 }
2059 };
2060 fire_for_poll(status.queued_messages as usize);
2061 }
2062 }
2063 }
2064 });
2065 task_handles.push(watchdog_handle);
2066 }
2067 }
2068
2069 tracing::info!("🔄 Starting Mailbox processing loop (State Path)");
2073 {
2074 let node = node_ref.clone();
2075 let mailbox = node_ref.mailbox.clone();
2076 let gate = node_ref.webrtc_gate.clone();
2077 let shutdown = shutdown_token.clone();
2078
2079 let mailbox_handle = tokio::spawn(async move {
2080 loop {
2081 tokio::select! {
2082 _ = shutdown.cancelled() => {
2084 tracing::info!("📭 Mailbox loop received shutdown signal");
2085 break;
2086 }
2087 result = mailbox.dequeue() => {
2089 match result {
2090 Ok(messages) => {
2091 if messages.is_empty() {
2092 tokio::time::sleep(Duration::from_millis(10)).await;
2094 continue;
2095 }
2096 tracing::debug!("📬 Mailbox dequeue: {} messages", messages.len());
2097
2098 for msg_record in messages {
2100 match RpcEnvelope::decode(&msg_record.payload[..]) {
2102 Ok(envelope) => {
2103 let request_id = envelope.request_id.clone();
2104 let queue_latency_ms = (chrono::Utc::now() - msg_record.created_at).num_milliseconds();
2105 tracing::info!(request_id = %request_id, queue_latency_ms = queue_latency_ms, "rpc.mailbox.dequeued");
2106
2107 tracing::debug!("📦 Processing message: request_id={}", request_id);
2108 #[cfg(feature = "opentelemetry")]
2109 let span = {
2110 let actr_id_str = node.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default();
2111 let span = tracing::info_span!("ActrNode.mailbox_receive", actr_id = %actr_id_str, request_id = %request_id, queue_wait_ms = queue_latency_ms);
2112 set_parent_from_rpc_envelope(&span, &envelope);
2113 span
2114 };
2115
2116 let caller_id_result = ActrId::decode(&msg_record.from[..]);
2118 let caller_id_ref = caller_id_result.as_ref().ok();
2119
2120 if caller_id_ref.is_none() {
2121 tracing::warn!(
2122 request_id = %request_id,
2123 "⚠️ Failed to decode caller_id from MessageRecord.from"
2124 );
2125 }
2126
2127 let handle_incoming_fut = node.handle_incoming(envelope.clone(), caller_id_ref);
2129 #[cfg(feature = "opentelemetry")]
2130 let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
2131
2132 match handle_incoming_fut.await {
2133 Ok(response_bytes) => {
2134 if let Some(ref gate) = gate {
2136 match caller_id_result {
2138 Ok(caller) => {
2139 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
2141 let mut response_envelope = RpcEnvelope {
2142 request_id, route_key: envelope.route_key.clone(),
2144 payload: Some(response_bytes),
2145 error: None,
2146 traceparent: envelope.traceparent.clone(),
2147 tracestate: envelope.tracestate.clone(),
2148 metadata: Vec::new(), timeout_ms: 30000,
2150 };
2151 #[cfg(feature = "opentelemetry")]
2153 inject_span_context_to_rpc(&span, &mut response_envelope);
2154
2155 let send_response_fut = gate.send_response(&caller, response_envelope);
2156 #[cfg(feature = "opentelemetry")]
2157 let send_response_fut = send_response_fut.instrument(span);
2158 if let Err(e) = send_response_fut.await {
2159 tracing::error!(
2160 severity = 7,
2161 error_category = "transport_error",
2162 request_id = %envelope.request_id,
2163 "❌ Failed to send response: {:?}",
2164 e
2165 );
2166 }
2167 }
2168 Err(e) => {
2169 tracing::error!(
2170 severity = 8,
2171 error_category = "protobuf_decode",
2172 request_id = %envelope.request_id,
2173 "❌ Failed to decode caller_id: {:?}",
2174 e
2175 );
2176 }
2177 }
2178 }
2179
2180 if let Err(e) = mailbox.ack(msg_record.id).await {
2182 tracing::error!(
2183 severity = 9,
2184 error_category = "mailbox_error",
2185 request_id = %envelope.request_id,
2186 message_id = %msg_record.id,
2187 "❌ Mailbox ACK failed: {:?}",
2188 e
2189 );
2190 }
2191 }
2192 Err(e) => {
2193 tracing::error!(
2194 severity = 6,
2195 error_category = "handler_error",
2196 request_id = %envelope.request_id,
2197 route_key = %envelope.route_key,
2198 "❌ handle_incoming failed: {:?}", e
2199 );
2200 let _ = mailbox.ack(msg_record.id).await;
2203 }
2204 }
2205 }
2206 Err(e) => {
2207 tracing::error!(
2209 severity = 9,
2210 error_category = "protobuf_decode",
2211 message_id = %msg_record.id,
2212 "❌ Poison message: Failed to deserialize RpcEnvelope: {:?}",
2213 e
2214 );
2215
2216 use actr_runtime_mailbox::DlqRecord;
2218 use chrono::Utc;
2219 use uuid::Uuid;
2220
2221 let dlq_record = DlqRecord {
2222 id: Uuid::new_v4(),
2223 original_message_id: Some(msg_record.id.to_string()),
2224 from: Some(msg_record.from.clone()),
2225 to: node.actor_id.as_ref().map(|id| {
2226 let mut buf = Vec::new();
2227 id.encode(&mut buf).unwrap();
2228 buf
2229 }),
2230 raw_bytes: msg_record.payload.clone(),
2231 error_message: format!("Protobuf decode failed: {e}"),
2232 error_category: "protobuf_decode".to_string(),
2233 trace_id: format!("mailbox-{}", msg_record.id),
2234 request_id: None,
2235 created_at: Utc::now(),
2236 redrive_attempts: 0,
2237 last_redrive_at: None,
2238 context: Some(format!(
2239 r#"{{"source":"mailbox","priority":"{}"}}"#,
2240 match msg_record.priority {
2241 actr_runtime_mailbox::MessagePriority::High => "high",
2242 actr_runtime_mailbox::MessagePriority::Normal => "normal",
2243 }
2244 )),
2245 };
2246
2247 if let Err(dlq_err) = node.dlq.enqueue(dlq_record).await {
2248 tracing::error!(
2249 severity = 10,
2250 "❌ CRITICAL: Failed to write poison message to DLQ: {:?}",
2251 dlq_err
2252 );
2253 } else {
2254 tracing::warn!(
2255 severity = 9,
2256 "☠️ Poison message moved to DLQ: message_id={}",
2257 msg_record.id
2258 );
2259 }
2260
2261 let _ = mailbox.ack(msg_record.id).await;
2263 }
2264 }
2265 }
2266 }
2267 Err(e) => {
2268 tracing::error!(
2269 severity = 9,
2270 error_category = "mailbox_error",
2271 "❌ Mailbox dequeue failed: {:?}", e
2272 );
2273 tokio::time::sleep(Duration::from_secs(1)).await;
2274 }
2275 }
2276 }
2277 }
2278 }
2279 tracing::info!("✅ Mailbox processing loop terminated gracefully");
2280 });
2281
2282 task_handles.push(mailbox_handle);
2283 }
2284 tracing::info!("✅ Mailbox processing loop started");
2285 tracing::info!("✅ ActrNode started successfully");
2286
2287 {
2288 let ready_ctx = bootstrap_ctx_builder
2289 .build_bootstrap(&actor_id, &credential_state.credential().await);
2290 let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_ready");
2291 let call_executor =
2292 lifecycle_host_abi(ready_ctx.clone(), node_ref.workload_dispatch.clone());
2293 let mut workload = node_ref.workload_dispatch.lock().await;
2294 if let Err(e) = crate::lifecycle::hooks::call_lifecycle_hook(
2295 "on_ready",
2296 workload.on_ready(ready_ctx, invocation, &call_executor),
2297 )
2298 .await
2299 {
2300 tracing::warn!(error = %e, "workload on_ready returned Err");
2301 }
2302 }
2303
2304 let shared = Arc::new(ActrRefShared {
2306 actor_id,
2307 bootstrap_ctx_builder,
2308 credential_state,
2309 shutdown_token,
2310 task_handles: Mutex::new(task_handles),
2311 });
2312
2313 tracing::info!("✅ ActrRef created (Shell → Guest communication handle)");
2315
2316 Ok(ActrRef { shared })
2317 }
2318}