1use crate::actr_ref::{ActrRef, ActrRefShared};
9use crate::ais_client::AisClient;
10use crate::context::{BootstrapContextBuilder, RuntimeContext};
11use crate::inbound::{DataStreamRegistry, MediaFrameRegistry};
12use crate::lifecycle::dedup::{DEDUP_TTL, DedupOutcome, DedupState, DedupWaiter};
13use crate::outbound::Gate;
14use crate::transport::HostTransport;
15use crate::wire::webrtc::SignalingClient;
16#[cfg(feature = "opentelemetry")]
17use crate::wire::webrtc::trace::{inject_span_context_to_rpc, set_parent_from_rpc_envelope};
18use actr_framework::Bytes;
19use actr_protocol::prost::Message as ProstMessage;
20use actr_protocol::{
21 AIdCredential, ActorResult, ActrError, ActrId, PayloadType, RegisterAuthMode, RegisterRequest,
22 RpcEnvelope, TurnCredential, register_response,
23};
24use actr_runtime::check_acl_permission;
25use actr_runtime_mailbox::{DeadLetterQueue, Mailbox};
26use std::sync::Arc;
27use std::time::Duration;
28use tokio::sync::{Mutex, RwLock};
29use tokio_util::sync::CancellationToken;
30#[cfg(feature = "opentelemetry")]
31use tracing::Instrument as _;
32
33pub(crate) struct Inner {
40 pub(crate) config: actr_config::RuntimeConfig,
42
43 pub(crate) mailbox: Arc<dyn Mailbox>,
45
46 pub(crate) dlq: Arc<dyn DeadLetterQueue>,
48
49 pub(crate) inproc_gate: Gate,
54
55 pub(crate) outproc_gate: Option<Gate>,
61
62 pub(crate) data_stream_registry: Arc<DataStreamRegistry>,
66
67 pub(crate) media_frame_registry: Arc<MediaFrameRegistry>,
70
71 pub(crate) signaling_client: Arc<dyn SignalingClient>,
73
74 pub(crate) actor_id: Option<ActrId>,
76
77 pub(crate) credential_state: Option<CredentialState>,
79
80 pub(crate) webrtc_coordinator: Option<Arc<crate::wire::webrtc::WebRtcCoordinator>>,
82
83 pub(crate) peer_transport: Option<Arc<crate::transport::PeerTransport>>,
85
86 pub(crate) webrtc_gate: Option<Arc<crate::wire::webrtc::gate::WebRtcGate>>,
88
89 pub(crate) websocket_gate: Option<Arc<crate::wire::websocket::WebSocketGate>>,
91
92 pub(crate) shell_to_workload: Option<Arc<HostTransport>>,
96
97 pub(crate) workload_to_shell: Option<Arc<HostTransport>>,
101
102 pub(crate) shutdown_token: CancellationToken,
104
105 pub(crate) actr_lock: Option<Arc<actr_config::lock::LockFile>>,
110 pub(crate) network_event_rx:
112 Option<tokio::sync::mpsc::Receiver<crate::lifecycle::network_event::NetworkEventRequest>>,
113
114 pub(crate) network_event_debounce_config:
116 Option<crate::lifecycle::network_event::DebounceConfig>,
117
118 pub(crate) dedup_state: Arc<Mutex<DedupState>>,
120
121 #[allow(dead_code)]
123 pub(crate) package_manifest: Option<actr_pack::PackageManifest>,
124
125 pub(crate) preregistered_credential: Option<actr_protocol::register_response::RegisterOk>,
129
130 pub(crate) discovered_ws_addresses:
136 Arc<tokio::sync::RwLock<std::collections::HashMap<ActrId, String>>>,
137
138 pub(crate) workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
151
152 #[allow(dead_code)]
160 pub(crate) hook_observer: Option<crate::lifecycle::hooks::WorkloadHookObserverRef>,
161
162 pub(crate) mailbox_backpressure_threshold: usize,
168
169 #[allow(dead_code)]
173 pub(crate) credential_expiry_warning: Duration,
174}
175
176#[derive(Clone)]
178pub struct CredentialState {
179 inner: Arc<RwLock<CredentialStateInner>>,
180}
181
182#[derive(Clone)]
183struct CredentialStateInner {
184 credential: AIdCredential,
185 expires_at: Option<prost_types::Timestamp>,
186 turn_credential: Option<TurnCredential>,
188}
189
190impl CredentialState {
191 pub fn new(
193 credential: AIdCredential,
194 expires_at: Option<prost_types::Timestamp>,
195 turn_credential: Option<TurnCredential>,
196 ) -> Self {
197 Self {
198 inner: Arc::new(RwLock::new(CredentialStateInner {
199 credential,
200 expires_at,
201 turn_credential,
202 })),
203 }
204 }
205
206 pub async fn credential(&self) -> AIdCredential {
207 self.inner.read().await.credential.clone()
208 }
209
210 pub async fn expires_at(&self) -> Option<prost_types::Timestamp> {
211 self.inner.read().await.expires_at
212 }
213
214 pub async fn turn_credential(&self) -> Option<TurnCredential> {
216 self.inner.read().await.turn_credential.clone()
217 }
218
219 pub(crate) async fn update(
223 &self,
224 credential: AIdCredential,
225 expires_at: Option<prost_types::Timestamp>,
226 turn_credential: Option<TurnCredential>,
227 ) {
228 let mut guard = self.inner.write().await;
229 guard.credential = credential;
230 guard.expires_at = expires_at;
231 if turn_credential.is_some() {
232 guard.turn_credential = turn_credential;
233 }
234 }
235}
236
237async fn host_operation_handler(
241 ctx: crate::context::RuntimeContext,
242 workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
243 pending: crate::workload::HostOperation,
244) -> crate::workload::HostOperationResult {
245 use crate::workload::{HostOperation, HostOperationResult, decode_dest};
246 use actr_framework::guest::dynclib_abi::code as abi_code;
247 use actr_framework::{Context as _, Dest};
248 use actr_protocol::{DataStream, PayloadType};
249
250 fn actr_error_to_code(err: &ActrError) -> i32 {
252 match err {
253 ActrError::DecodeFailure(_) | ActrError::InvalidArgument(_) => abi_code::PROTOCOL_ERROR,
254 _ => abi_code::GENERIC_ERROR,
255 }
256 }
257
258 match pending {
259 HostOperation::CallRaw(req) => {
260 match ctx
261 .call_raw(
262 &Dest::Actor(req.target),
263 req.route_key,
264 PayloadType::RpcReliable,
265 bytes::Bytes::from(req.payload),
266 30_000,
267 )
268 .await
269 {
270 Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
271 Err(e) => {
272 tracing::error!("call_raw routing failed: {e:?}");
273 HostOperationResult::Error(actr_error_to_code(&e))
274 }
275 }
276 }
277
278 HostOperation::Call(req) => {
279 let dest = match decode_dest(&req.dest) {
280 Some(d) => d,
281 None => {
282 tracing::error!(route_key = req.route_key, "call: dest decode failed");
283 return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
284 }
285 };
286 match ctx
287 .call_raw(
288 &dest,
289 req.route_key,
290 PayloadType::RpcReliable,
291 bytes::Bytes::from(req.payload),
292 30_000,
293 )
294 .await
295 {
296 Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
297 Err(e) => {
298 tracing::error!("call routing failed: {e:?}");
299 HostOperationResult::Error(actr_error_to_code(&e))
300 }
301 }
302 }
303
304 HostOperation::Tell(req) => {
305 let dest = match decode_dest(&req.dest) {
306 Some(d) => d,
307 None => {
308 tracing::error!(route_key = req.route_key, "tell: dest decode failed");
309 return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
310 }
311 };
312 match ctx
313 .tell_raw(
314 &dest,
315 req.route_key,
316 PayloadType::RpcReliable,
317 bytes::Bytes::from(req.payload),
318 )
319 .await
320 {
321 Ok(()) => HostOperationResult::Done,
322 Err(e) => {
323 tracing::error!("tell routing failed: {e:?}");
324 HostOperationResult::Error(actr_error_to_code(&e))
325 }
326 }
327 }
328
329 HostOperation::Discover(req) => {
330 match ctx.discover_route_candidate(&req.target_type).await {
331 Ok(id) => HostOperationResult::Bytes(id.encode_to_vec()),
332 Err(e) => {
333 tracing::error!("discover failed: {e:?}");
334 HostOperationResult::Error(actr_error_to_code(&e))
335 }
336 }
337 }
338
339 HostOperation::RegisterStream(req) => {
340 let stream_id = req.stream_id;
341 let callback_ctx = ctx.clone();
342 let callback_workload_dispatch = workload_dispatch.clone();
343 match ctx
344 .register_stream(stream_id, move |chunk: DataStream, sender| {
345 let ctx_for_executor = callback_ctx.clone();
346 let workload_dispatch = callback_workload_dispatch.clone();
347 Box::pin(async move {
348 let invocation = crate::workload::InvocationContext {
349 self_id: actr_framework::Context::self_id(&ctx_for_executor).clone(),
350 caller_id: Some(sender.clone()),
351 request_id: format!(
352 "data-stream:{}:{}",
353 chunk.stream_id, chunk.sequence
354 ),
355 };
356 let call_executor: crate::workload::HostAbiFn =
357 std::sync::Arc::new(move |pending| {
358 let ctx = ctx_for_executor.clone();
359 Box::pin(async move {
360 stream_callback_host_operation_handler(ctx, pending).await
361 })
362 });
363 let mut guard = workload_dispatch.lock().await;
364 guard
365 .dispatch_data_stream(chunk, sender, invocation, &call_executor)
366 .await
367 })
368 })
369 .await
370 {
371 Ok(()) => HostOperationResult::Done,
372 Err(e) => {
373 tracing::error!("register_stream failed: {e:?}");
374 HostOperationResult::Error(actr_error_to_code(&e))
375 }
376 }
377 }
378
379 HostOperation::UnregisterStream(req) => match ctx.unregister_stream(&req.stream_id).await {
380 Ok(()) => HostOperationResult::Done,
381 Err(e) => {
382 tracing::error!("unregister_stream failed: {e:?}");
383 HostOperationResult::Error(actr_error_to_code(&e))
384 }
385 },
386
387 HostOperation::SendDataStream(req) => {
388 let dest = match decode_dest(&req.dest) {
389 Some(d) => d,
390 None => {
391 tracing::error!("send_data_stream: dest decode failed");
392 return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
393 }
394 };
395 let payload_type = match PayloadType::try_from(req.payload_type) {
396 Ok(PayloadType::StreamReliable | PayloadType::StreamLatencyFirst) => {
397 PayloadType::try_from(req.payload_type).expect("checked payload type")
398 }
399 Ok(other) => {
400 tracing::error!(?other, "send_data_stream: invalid stream payload type");
401 return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
402 }
403 Err(_) => {
404 tracing::error!(
405 payload_type = req.payload_type,
406 "send_data_stream: unknown payload type"
407 );
408 return HostOperationResult::Error(abi_code::PROTOCOL_ERROR);
409 }
410 };
411 match ctx.send_data_stream(&dest, req.chunk, payload_type).await {
412 Ok(()) => HostOperationResult::Done,
413 Err(e) => {
414 tracing::error!("send_data_stream failed: {e:?}");
415 HostOperationResult::Error(actr_error_to_code(&e))
416 }
417 }
418 }
419 }
420}
421
422fn lifecycle_invocation(
423 actor_id: &ActrId,
424 request_id: &'static str,
425) -> crate::workload::InvocationContext {
426 crate::workload::InvocationContext {
427 self_id: actor_id.clone(),
428 caller_id: None,
429 request_id: request_id.to_string(),
430 }
431}
432
433pub(crate) fn lifecycle_host_abi(
434 ctx: crate::context::RuntimeContext,
435 workload_dispatch: Arc<Mutex<crate::workload::Workload>>,
436) -> crate::workload::HostAbiFn {
437 std::sync::Arc::new(move |pending| {
438 let ctx = ctx.clone();
439 let workload_dispatch = workload_dispatch.clone();
440 Box::pin(async move { host_operation_handler(ctx, workload_dispatch, pending).await })
441 })
442}
443
444async fn stream_callback_host_operation_handler(
445 ctx: crate::context::RuntimeContext,
446 pending: crate::workload::HostOperation,
447) -> crate::workload::HostOperationResult {
448 use crate::workload::{HostOperation, HostOperationResult, decode_dest};
449 use actr_framework::guest::dynclib_abi::code as abi_code;
450 use actr_framework::{Context as _, Dest};
451 use actr_protocol::PayloadType;
452
453 fn actr_error_to_code(err: &ActrError) -> i32 {
454 match err {
455 ActrError::DecodeFailure(_) | ActrError::InvalidArgument(_) => abi_code::PROTOCOL_ERROR,
456 _ => abi_code::GENERIC_ERROR,
457 }
458 }
459
460 match pending {
461 HostOperation::CallRaw(req) => {
462 match ctx
463 .call_raw(
464 &Dest::Actor(req.target),
465 req.route_key,
466 PayloadType::RpcReliable,
467 bytes::Bytes::from(req.payload),
468 30_000,
469 )
470 .await
471 {
472 Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
473 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
474 }
475 }
476 HostOperation::Call(req) => {
477 let dest = match decode_dest(&req.dest) {
478 Some(d) => d,
479 None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
480 };
481 match ctx
482 .call_raw(
483 &dest,
484 req.route_key,
485 PayloadType::RpcReliable,
486 bytes::Bytes::from(req.payload),
487 30_000,
488 )
489 .await
490 {
491 Ok(resp) => HostOperationResult::Bytes(resp.to_vec()),
492 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
493 }
494 }
495 HostOperation::Tell(req) => {
496 let dest = match decode_dest(&req.dest) {
497 Some(d) => d,
498 None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
499 };
500 match ctx
501 .tell_raw(
502 &dest,
503 req.route_key,
504 PayloadType::RpcReliable,
505 bytes::Bytes::from(req.payload),
506 )
507 .await
508 {
509 Ok(()) => HostOperationResult::Done,
510 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
511 }
512 }
513 HostOperation::Discover(req) => {
514 match ctx.discover_route_candidate(&req.target_type).await {
515 Ok(id) => HostOperationResult::Bytes(id.encode_to_vec()),
516 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
517 }
518 }
519 HostOperation::RegisterStream(_) => {
520 tracing::error!("register_stream from inside a stream callback is not supported");
521 HostOperationResult::Error(abi_code::UNSUPPORTED_OP)
522 }
523 HostOperation::UnregisterStream(req) => match ctx.unregister_stream(&req.stream_id).await {
524 Ok(()) => HostOperationResult::Done,
525 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
526 },
527 HostOperation::SendDataStream(req) => {
528 let dest = match decode_dest(&req.dest) {
529 Some(d) => d,
530 None => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
531 };
532 let payload_type = match PayloadType::try_from(req.payload_type) {
533 Ok(PayloadType::StreamReliable | PayloadType::StreamLatencyFirst) => {
534 PayloadType::try_from(req.payload_type).expect("checked payload type")
535 }
536 Ok(_) | Err(_) => return HostOperationResult::Error(abi_code::PROTOCOL_ERROR),
537 };
538 match ctx.send_data_stream(&dest, req.chunk, payload_type).await {
539 Ok(()) => HostOperationResult::Done,
540 Err(e) => HostOperationResult::Error(actr_error_to_code(&e)),
541 }
542 }
543 }
544}
545
546fn protocol_error_to_code(err: &ActrError) -> u32 {
548 match err {
549 ActrError::Unavailable(_) => 503, ActrError::ConnectionNotReady(_) => 503, ActrError::TimedOut => 504, ActrError::NotFound(_) => 404, ActrError::PermissionDenied(_) => 403, ActrError::InvalidArgument(_) => 400, ActrError::UnknownRoute(_) => 404, ActrError::DependencyNotFound { .. } => 400, ActrError::DecodeFailure(_) => 400, ActrError::NotImplemented(_) => 501, ActrError::Internal(_) => 500, }
561}
562
563impl Inner {
564 #[allow(dead_code)]
565 pub(crate) fn package_manifest(&self) -> Option<&actr_pack::PackageManifest> {
566 self.package_manifest.as_ref()
567 }
568
569 async fn network_event_loop(
576 event_rx: tokio::sync::mpsc::Receiver<crate::lifecycle::network_event::NetworkEventRequest>,
577 event_processor: Arc<dyn crate::lifecycle::network_event::NetworkEventProcessor>,
578 shutdown_token: CancellationToken,
579 ) {
580 crate::lifecycle::network_event::run_network_event_reconciler(
581 event_rx,
582 event_processor,
583 shutdown_token,
584 )
585 .await;
586 }
587
588 fn duplicate_wait_timeout(timeout_ms: i64) -> Duration {
589 if timeout_ms > 0 {
590 Duration::from_millis(timeout_ms as u64)
591 } else {
592 DEDUP_TTL
593 }
594 }
595
596 async fn wait_for_inflight_duplicate(
597 mut waiter: DedupWaiter,
598 timeout: Duration,
599 ) -> ActorResult<Bytes> {
600 let wait_for_result = async {
601 loop {
602 if let Some(result) = waiter.borrow().clone() {
603 return result;
604 }
605
606 if waiter.changed().await.is_err() {
607 if let Some(result) = waiter.borrow().clone() {
608 return result;
609 }
610 return Err(ActrError::Unavailable(
611 "duplicate request result unavailable".to_string(),
612 ));
613 }
614 }
615 };
616
617 match tokio::time::timeout(timeout, wait_for_result).await {
618 Ok(result) => result,
619 Err(_) => Err(ActrError::Unavailable(format!(
620 "duplicate request in-flight timed out after {}ms",
621 timeout.as_millis()
622 ))),
623 }
624 }
625
626 #[cfg_attr(
629 feature = "opentelemetry",
630 tracing::instrument(
631 skip_all,
632 name = "ActrNode.handle_incoming",
633 fields(
634 actr_id = %self.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default(),
635 route_key = %envelope.route_key,
636 request_id = %envelope.request_id,
637 )
638 )
639 )]
640 pub async fn handle_incoming(
641 &self,
642 envelope: RpcEnvelope,
643 caller_id: Option<&ActrId>,
644 ) -> ActorResult<Bytes> {
645 if let Some(caller) = caller_id {
647 tracing::debug!(
648 "📨 Handling incoming message: route_key={}, caller={}, request_id={}",
649 envelope.route_key,
650 caller,
651 envelope.request_id
652 );
653 } else {
654 tracing::debug!(
655 "📨 Handling incoming message: route_key={}, request_id={}",
656 envelope.route_key,
657 envelope.request_id
658 );
659 }
660
661 let actor_id = self.actor_id.as_ref().ok_or_else(|| {
663 ActrError::Internal(
664 "Actor ID not set - node must be started before handling messages".to_string(),
665 )
666 })?;
667
668 let acl_allowed = check_acl_permission(caller_id, actor_id, self.config.acl.as_ref())
670 .map_err(|err_msg| ActrError::Internal(format!("ACL check failed: {}", err_msg)))?;
671
672 if !acl_allowed {
673 tracing::warn!(
674 severity = 5,
675 error_category = "acl_denied",
676 request_id = %envelope.request_id,
677 route_key = %envelope.route_key,
678 caller = %caller_id
679 .map(|c| c.to_string())
680 .unwrap_or_else(|| "<none>".to_string()),
681 "🚫 ACL: Permission denied"
682 );
683
684 return Err(ActrError::PermissionDenied(format!(
685 "ACL denied: {} is not allowed to call {}",
686 caller_id
687 .map(|c| c.to_string())
688 .unwrap_or_else(|| "<unknown>".to_string()),
689 actor_id
690 )));
691 }
692
693 let outcome = {
695 self.dedup_state
696 .lock()
697 .await
698 .check_or_mark(&envelope.request_id)
699 };
700 match outcome {
701 DedupOutcome::Fresh => {} DedupOutcome::InFlight(waiter) => {
703 tracing::debug!(
704 request_id = %envelope.request_id,
705 route_key = %envelope.route_key,
706 "duplicate request in-flight; waiting for original result"
707 );
708 return Self::wait_for_inflight_duplicate(
709 waiter,
710 Self::duplicate_wait_timeout(envelope.timeout_ms),
711 )
712 .await;
713 }
714 DedupOutcome::Duplicate(cached) => {
715 tracing::debug!(
716 request_id = %envelope.request_id,
717 route_key = %envelope.route_key,
718 "♻️ returning cached response for duplicate request_id"
719 );
720 return cached;
721 }
722 }
723
724 let credential_state = self.credential_state.clone().ok_or_else(|| {
726 ActrError::Internal(
727 "Credential not set - node must be started before handling messages".to_string(),
728 )
729 })?;
730 let ctx = self.make_runtime_context(
731 actor_id,
732 caller_id, &envelope.request_id,
734 &credential_state.credential().await,
735 );
736
737 let dispatch_ctx = crate::workload::InvocationContext {
739 self_id: actor_id.clone(),
740 caller_id: caller_id.cloned(),
741 request_id: envelope.request_id.clone(),
742 };
743 let ctx_for_executor = ctx.clone();
744 let workload_for_executor = self.workload_dispatch.clone();
745 let call_executor: crate::workload::HostAbiFn = std::sync::Arc::new(move |pending| {
746 let ctx = ctx_for_executor.clone();
747 let workload_dispatch = workload_for_executor.clone();
748 Box::pin(async move { host_operation_handler(ctx, workload_dispatch, pending).await })
749 });
750
751 let mut guard = self.workload_dispatch.lock().await;
752 let result = guard
753 .dispatch_envelope(envelope.clone(), ctx.clone(), dispatch_ctx, &call_executor)
754 .await
755 .map_err(|e| ActrError::Internal(format!("workload dispatch failed: {e:?}")));
756
757 match &result {
758 Ok(_) => tracing::debug!(
759 request_id = %envelope.request_id,
760 route_key = %envelope.route_key,
761 "✅ Message handled successfully"
762 ),
763 Err(e) => tracing::error!(
764 severity = 6,
765 error_category = "handler_error",
766 request_id = %envelope.request_id,
767 route_key = %envelope.route_key,
768 "❌ Message handling failed: {:?}", e
769 ),
770 }
771
772 self.dedup_state
774 .lock()
775 .await
776 .complete(&envelope.request_id, result.clone());
777
778 result
779 }
780
781 pub(crate) async fn build(
786 config: actr_config::RuntimeConfig,
787 workload: crate::workload::Workload,
788 package_manifest: Option<actr_pack::PackageManifest>,
789 packaged_lock: Option<actr_config::lock::LockFile>,
790 mailbox_backpressure_threshold: usize,
791 credential_expiry_warning: Duration,
792 ) -> ActorResult<Self> {
793 use crate::outbound::{Gate, HostGate};
794 use crate::wire::webrtc::{ReconnectConfig, SignalingConfig, WebSocketSignalingClient};
795
796 tracing::info!("🚀 Initializing ActrNode");
797
798 let mailbox_path = config
800 .mailbox_path
801 .as_ref()
802 .map(|p| p.to_string_lossy().to_string())
803 .unwrap_or_else(|| ":memory:".to_string());
804
805 tracing::info!("📂 Mailbox database path: {}", mailbox_path);
806
807 let mailbox: Arc<dyn actr_runtime_mailbox::Mailbox> = Arc::new(
808 actr_runtime_mailbox::SqliteMailbox::new(&mailbox_path)
809 .await
810 .map_err(|e| {
811 actr_protocol::ActrError::Unavailable(format!("Mailbox init failed: {e}"))
812 })?,
813 );
814
815 let dlq_path = if mailbox_path == ":memory:" {
817 ":memory:".to_string()
818 } else {
819 format!("{mailbox_path}.dlq")
820 };
821
822 let dlq: Arc<dyn actr_runtime_mailbox::DeadLetterQueue> = Arc::new(
823 actr_runtime_mailbox::SqliteDeadLetterQueue::new_standalone(&dlq_path)
824 .await
825 .map_err(|e| {
826 actr_protocol::ActrError::Unavailable(format!("DLQ init failed: {e}"))
827 })?,
828 );
829 tracing::info!("✅ Dead Letter Queue initialized");
830
831 let webrtc_role = if config.webrtc.advanced.prefer_answerer() {
833 Some("answer".to_string())
834 } else {
835 None
836 };
837
838 let signaling_config = SignalingConfig {
839 server_url: config.signaling_url.clone(),
840 connection_timeout: 30,
841 heartbeat_interval: 30,
842 reconnect_config: ReconnectConfig::default(),
843 auth_config: None,
844 webrtc_role,
845 };
846
847 let client = Arc::new(WebSocketSignalingClient::new(signaling_config));
848 client.start_reconnect_manager();
849 let signaling_client: Arc<dyn crate::wire::webrtc::SignalingClient> = client;
850
851 let shell_to_workload = Arc::new(HostTransport::new());
853 let workload_to_shell = Arc::new(HostTransport::new());
854 let inproc_gate = Gate::Host(Arc::new(HostGate::new(shell_to_workload.clone())));
855
856 let data_stream_registry = Arc::new(DataStreamRegistry::new());
857 let media_frame_registry = Arc::new(MediaFrameRegistry::new());
858
859 tracing::info!("✅ Inproc infrastructure initialized (bidirectional Shell ↔ Guest)");
860
861 let actr_lock = if let Some(lock) = packaged_lock {
862 tracing::info!(
863 "📋 Loaded packaged manifest.lock.toml with {} dependencies",
864 lock.dependencies.len()
865 );
866 Some(Arc::new(lock))
867 } else {
868 tracing::warn!(
869 "⚠️ manifest.lock.toml not found in package. Continuing without dependency fingerprints."
870 );
871 None
872 };
873
874 tracing::info!("✅ ActrNode initialized");
875
876 Ok(Self {
877 config,
878 mailbox,
879 dlq,
880 inproc_gate,
881 outproc_gate: None, data_stream_registry,
883 media_frame_registry,
884 signaling_client,
885 actor_id: None,
886 credential_state: None,
887 webrtc_coordinator: None,
888 peer_transport: None,
889 webrtc_gate: None,
890 websocket_gate: None,
891 shell_to_workload: Some(shell_to_workload),
892 workload_to_shell: Some(workload_to_shell),
893 shutdown_token: CancellationToken::new(),
894 actr_lock,
895 network_event_rx: None,
896 network_event_debounce_config: None,
897 dedup_state: Arc::new(Mutex::new(DedupState::new())),
898 package_manifest,
899 preregistered_credential: None,
900 discovered_ws_addresses: Arc::new(tokio::sync::RwLock::new(
901 std::collections::HashMap::new(),
902 )),
903 workload_dispatch: Arc::new(Mutex::new(workload)),
904 hook_observer: None,
905 mailbox_backpressure_threshold,
906 credential_expiry_warning,
907 })
908 }
909
910 pub(crate) fn bootstrap_ctx_builder(&self) -> BootstrapContextBuilder {
918 BootstrapContextBuilder::new(
919 self.inproc_gate.clone(),
920 self.outproc_gate.clone(),
921 self.data_stream_registry.clone(),
922 self.media_frame_registry.clone(),
923 self.signaling_client.clone(),
924 self.actr_lock.clone(),
925 )
926 }
927
928 pub(crate) fn make_runtime_context(
933 &self,
934 self_id: &ActrId,
935 caller_id: Option<&ActrId>,
936 request_id: &str,
937 credential: &AIdCredential,
938 ) -> RuntimeContext {
939 RuntimeContext::new(
940 self_id.clone(),
941 caller_id.cloned(),
942 request_id.to_string(),
943 self.inproc_gate.clone(),
944 self.outproc_gate.clone(),
945 self.data_stream_registry.clone(),
946 self.media_frame_registry.clone(),
947 self.signaling_client.clone(),
948 credential.clone(),
949 self.actr_lock.clone(),
950 )
951 }
952
953 pub fn create_network_event_handle(
961 &mut self,
962 debounce_ms: u64,
963 ) -> crate::lifecycle::NetworkEventHandle {
964 if self.network_event_rx.is_some() {
965 panic!("create_network_event_handle() can only be called once");
966 }
967
968 let (event_tx, event_rx) = tokio::sync::mpsc::channel(100);
969
970 let debounce_config = if debounce_ms > 0 {
971 Some(crate::lifecycle::network_event::DebounceConfig {
972 window: std::time::Duration::from_millis(debounce_ms),
973 })
974 } else {
975 None
976 };
977
978 self.network_event_rx = Some(event_rx);
979 self.network_event_debounce_config = debounce_config;
980
981 tracing::info!(
982 debounce_ms,
983 channel_capacity = 100_u64,
984 "network_event.node.handle_created"
985 );
986
987 crate::lifecycle::NetworkEventHandle::new(event_tx)
988 }
989
990 pub fn set_preregistered_credential(&mut self, register_ok: register_response::RegisterOk) {
995 tracing::debug!("Pre-registered credential attached; start() will skip AIS registration");
996 self.preregistered_credential = Some(register_ok);
997 }
998
999 pub async fn start(mut self) -> ActorResult<ActrRef> {
1001 tracing::info!("🚀 Starting ActrNode");
1002 tracing::info!("Actr Rust version: {}", env!("CARGO_PKG_VERSION"));
1003
1004 let actr_type = self.config.actr_type().clone();
1009 tracing::info!("📋 Actor type: {}", actr_type);
1010
1011 let service_spec = None;
1017
1018 let ws_address = if let Some(port) = self.config.websocket_listen_port {
1021 let host = self
1022 .config
1023 .websocket_advertised_host
1024 .as_deref()
1025 .unwrap_or("127.0.0.1");
1026 Some(format!("ws://{}:{}", host, port))
1027 } else {
1028 None
1029 };
1030
1031 if let Some(ref addr) = ws_address {
1032 tracing::info!(
1033 "📡 Advertising WebSocket address to signaling server: {}",
1034 addr
1035 );
1036 }
1037
1038 let register_request = RegisterRequest {
1039 actr_type: actr_type.clone(),
1040 realm: self.config.realm,
1041 service_spec,
1042 acl: self.config.acl.clone(),
1043 service: None,
1044 ws_address,
1045 auth_mode: Some(RegisterAuthMode::Linked as i32),
1046 ..Default::default()
1047 };
1048
1049 let register_ok = if let Some(injected) = self.preregistered_credential.take() {
1053 tracing::info!(
1054 "Using Hyper pre-injected registration credential; skipping AIS registration"
1055 );
1056 injected
1057 } else {
1058 let ais_endpoint = &self.config.ais_endpoint;
1059 tracing::info!(
1060 ais_endpoint = %ais_endpoint,
1061 "Registering actor with AIS via HTTP"
1062 );
1063 let mut ais = AisClient::new(ais_endpoint);
1064 if let Some(ref secret) = self.config.realm_secret {
1065 ais = ais.with_realm_secret(secret);
1066 }
1067 let resp = ais
1068 .register_linked(register_request.clone())
1069 .await
1070 .map_err(|e| ActrError::Unavailable(format!("AIS registration failed: {e}")))?;
1071 match resp.result {
1072 Some(register_response::Result::Success(ok)) => {
1073 tracing::info!("✅ AIS HTTP registration successful");
1074 ok
1075 }
1076 Some(register_response::Result::Error(error)) => {
1077 tracing::error!(
1078 severity = 10,
1079 error_category = "registration_error",
1080 error_code = error.code,
1081 "❌ AIS registration failed: code={}, message={}",
1082 error.code,
1083 error.message
1084 );
1085 return Err(ActrError::Unavailable(format!(
1086 "AIS registration rejected: {} (code: {})",
1087 error.message, error.code
1088 )));
1089 }
1090 None => {
1091 tracing::error!(
1092 severity = 10,
1093 error_category = "registration_error",
1094 "❌ AIS registration response missing result"
1095 );
1096 return Err(ActrError::Unavailable(
1097 "Invalid AIS registration response: missing result".to_string(),
1098 ));
1099 }
1100 }
1101 };
1102
1103 let pre_connect_credential_state = {
1110 let actor_id = register_ok.actr_id.clone();
1111 let credential_state = CredentialState::new(
1112 register_ok.credential.clone(),
1113 register_ok.credential_expires_at,
1114 Some(register_ok.turn_credential.clone()),
1115 );
1116 self.signaling_client.set_actor_id(actor_id).await;
1117 self.signaling_client
1118 .set_credential_state(credential_state.clone())
1119 .await;
1120 credential_state
1121 };
1122
1123 {
1129 let actor_id = register_ok.actr_id.clone();
1130 let credential_state = pre_connect_credential_state.clone();
1131 let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1135 let ctx_builder: crate::lifecycle::hooks::HookContextBuilder = Arc::new(move || {
1136 let snapshot = ctx_builder_snapshot.clone();
1137 let actor_id = actor_id.clone();
1138 let credential_state = credential_state.clone();
1139 Box::pin(async move {
1140 Some(snapshot.build_bootstrap(&actor_id, &credential_state.credential().await))
1141 })
1142 });
1143 let cb = crate::lifecycle::hooks::build_hook_callback(
1144 self.hook_observer.clone(),
1145 ctx_builder,
1146 );
1147 self.signaling_client.set_hook_callback(cb);
1148 }
1149
1150 tracing::info!("📡 Connecting to signaling server (with credential)");
1151 self.signaling_client
1152 .connect()
1153 .await
1154 .map_err(|e| ActrError::Unavailable(format!("Signaling connect failed: {e}")))?;
1155 tracing::info!("✅ Connected to signaling server");
1156
1157 let mut task_handles = Vec::new();
1159
1160 let node_hook_callback: Option<crate::wire::webrtc::HookCallback>;
1164
1165 {
1166 let actor_id = register_ok.actr_id;
1167 let credential = register_ok.credential;
1168
1169 tracing::info!("🆔 Assigned ActrId: {}", actor_id);
1170 tracing::info!("🔐 Received credential (key_id: {})", credential.key_id);
1171 tracing::info!(
1172 "💓 Signaling heartbeat interval: {} seconds",
1173 register_ok.signaling_heartbeat_interval_secs
1174 );
1175
1176 tracing::debug!("TurnCredential received, TURN authentication ready");
1178
1179 if let Some(expires_at) = ®ister_ok.credential_expires_at {
1180 tracing::debug!("⏰ Credential expires at: {}s", expires_at.seconds);
1181 }
1182
1183 self.actor_id = Some(actor_id.clone());
1185 let credential_state = CredentialState::new(
1186 credential,
1187 register_ok.credential_expires_at,
1188 Some(register_ok.turn_credential.clone()),
1189 );
1190 self.credential_state = Some(credential_state.clone());
1191
1192 node_hook_callback =
1203 {
1204 let actor_id_for_hook = actor_id.clone();
1205 let credential_state_for_hook = credential_state.clone();
1206 let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1211 let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1212 Arc::new(move || {
1213 let snapshot = ctx_builder_snapshot.clone();
1214 let actor_id = actor_id_for_hook.clone();
1215 let credential_state = credential_state_for_hook.clone();
1216 Box::pin(async move {
1217 Some(snapshot.build_bootstrap(
1218 &actor_id,
1219 &credential_state.credential().await,
1220 ))
1221 })
1222 });
1223 Some(crate::lifecycle::hooks::build_hook_callback(
1224 self.hook_observer.clone(),
1225 ctx_builder,
1226 ))
1227 };
1228
1229 if let Some(expires_at) = ®ister_ok.credential_expires_at {
1234 let new_expiry = std::time::UNIX_EPOCH
1235 + std::time::Duration::from_secs(expires_at.seconds.max(0) as u64);
1236 if let Some(cb) = node_hook_callback.as_ref() {
1237 cb(crate::wire::webrtc::HookEvent::CredentialRenewed { new_expiry }).await;
1238 } else {
1239 tracing::info!(new_expiry = ?new_expiry, "credential renewed");
1240 }
1241 }
1242
1243 tracing::info!("✅ Inproc infrastructure already ready (created in ActrNode::build())");
1251
1252 tracing::info!("🌐 Initializing WebRTC infrastructure");
1256
1257 let media_frame_registry = self.media_frame_registry.clone();
1258
1259 let coordinator = Arc::new(crate::wire::webrtc::WebRtcCoordinator::new(
1261 actor_id.clone(),
1262 credential_state.clone(),
1263 self.signaling_client.clone(),
1264 self.config.webrtc.clone(),
1265 media_frame_registry,
1266 ));
1267
1268 {
1272 let actor_id_for_hook = actor_id.clone();
1273 let credential_state_for_hook = credential_state.clone();
1274 let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1278 let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1279 Arc::new(move || {
1280 let snapshot = ctx_builder_snapshot.clone();
1281 let actor_id = actor_id_for_hook.clone();
1282 let credential_state = credential_state_for_hook.clone();
1283 Box::pin(async move {
1284 Some(
1285 snapshot.build_bootstrap(
1286 &actor_id,
1287 &credential_state.credential().await,
1288 ),
1289 )
1290 })
1291 });
1292 let cb = crate::lifecycle::hooks::build_hook_callback(
1293 self.hook_observer.clone(),
1294 ctx_builder,
1295 );
1296 coordinator.set_hook_callback(cb);
1297 }
1298
1299 tracing::info!("🏗️ Creating PeerTransport with WebRTC support");
1303
1304 use crate::transport::{DefaultWireBuilder, DefaultWireBuilderConfig};
1306
1307 let local_id_hex = hex::encode(actor_id.encode_to_vec());
1310 let wire_builder_config = DefaultWireBuilderConfig {
1311 local_id_hex,
1312 enable_webrtc: true,
1313 enable_websocket: true,
1314 discovered_ws_addresses: self.discovered_ws_addresses.clone(),
1317 credential_state: Some(credential_state.clone()),
1320 };
1321 let wire_builder = Arc::new(DefaultWireBuilder::new(
1322 Some(coordinator.clone()),
1323 wire_builder_config,
1324 ));
1325
1326 use crate::transport::PeerTransport;
1328 let transport_manager = Arc::new(PeerTransport::new(actor_id.clone(), wire_builder));
1329 self.peer_transport = Some(transport_manager.clone());
1330
1331 use crate::outbound::PeerGate;
1333 let outproc_gate =
1334 Arc::new(PeerGate::new(transport_manager, Some(coordinator.clone())));
1335 let outproc_gate_enum = Gate::Peer(outproc_gate.clone());
1336 tracing::info!("PeerTransport + PeerGate initialized");
1337
1338 let data_stream_registry = self.data_stream_registry.clone();
1339
1340 let pending_requests = outproc_gate.get_pending_requests();
1342 let gate = Arc::new(crate::wire::webrtc::gate::WebRtcGate::new(
1343 coordinator.clone(),
1344 pending_requests,
1345 data_stream_registry.clone(),
1346 ));
1347 gate.set_local_id(actor_id.clone()).await;
1349 tracing::info!(
1350 "✅ WebRtcGate created with shared pending_requests and DataStreamRegistry"
1351 );
1352
1353 tracing::info!("🔧 Wiring outproc_gate into node");
1360 self.outproc_gate = Some(outproc_gate_enum);
1361 tracing::info!("✅ Node runtime gates fully initialized (inproc + outproc)");
1362
1363 self.webrtc_coordinator = Some(coordinator.clone());
1365 self.webrtc_gate = Some(gate.clone());
1366 tracing::info!("✅ WebRTC infrastructure initialized");
1367
1368 {
1372 let startup_ctx = self
1373 .bootstrap_ctx_builder()
1374 .build_bootstrap(&actor_id, &credential_state.credential().await);
1375 let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_start");
1376 let call_executor =
1377 lifecycle_host_abi(startup_ctx.clone(), self.workload_dispatch.clone());
1378 let mut workload = self.workload_dispatch.lock().await;
1379 crate::lifecycle::hooks::call_lifecycle_hook(
1380 "on_start",
1381 workload.on_start(startup_ctx, invocation, &call_executor),
1382 )
1383 .await?;
1384 }
1385
1386 if let Some(listen_port) = self.config.websocket_listen_port {
1390 tracing::info!(
1391 "🔌 WebSocket direct-connect mode enabled, binding port {}",
1392 listen_port
1393 );
1394 use crate::key_cache::AisKeyCache;
1395 use crate::wire::websocket::gate::WsAuthContext;
1396 use crate::wire::websocket::{WebSocketGate, WebSocketServer};
1397
1398 let ais_key_cache = AisKeyCache::new();
1400 if !register_ok.signing_pubkey.is_empty() {
1401 match ais_key_cache
1402 .seed(register_ok.signing_key_id, ®ister_ok.signing_pubkey)
1403 .await
1404 {
1405 Ok(()) => tracing::info!(
1406 key_id = register_ok.signing_key_id,
1407 "🔑 AisKeyCache seeded from RegisterOk"
1408 ),
1409 Err(e) => tracing::warn!(
1410 key_id = register_ok.signing_key_id,
1411 error = ?e,
1412 "AisKeyCache seed failed; WebSocket will reject all inbound connections"
1413 ),
1414 }
1415 } else {
1416 tracing::warn!(
1417 "RegisterOk missing signing_pubkey; WebSocket credential verification will degrade"
1418 );
1419 }
1420
1421 let auth_ctx = WsAuthContext {
1422 ais_key_cache,
1423 actor_id: actor_id.clone(),
1424 credential_state: credential_state.clone(),
1425 signaling_client: self.signaling_client.clone(),
1426 };
1427
1428 match WebSocketServer::bind(listen_port).await {
1429 Ok((ws_server, conn_rx)) => {
1430 ws_server.start(self.shutdown_token.clone());
1431 let ws_gate = Arc::new(WebSocketGate::new(
1432 conn_rx,
1433 outproc_gate.get_pending_requests(),
1434 data_stream_registry.clone(),
1435 Some(auth_ctx),
1436 ));
1437
1438 {
1440 let actor_id_for_hook = actor_id.clone();
1441 let credential_state_for_hook = credential_state.clone();
1442 let ctx_builder_snapshot = self.bootstrap_ctx_builder();
1446 let ctx_builder: crate::lifecycle::hooks::HookContextBuilder =
1447 Arc::new(move || {
1448 let snapshot = ctx_builder_snapshot.clone();
1449 let actor_id = actor_id_for_hook.clone();
1450 let credential_state = credential_state_for_hook.clone();
1451 Box::pin(async move {
1452 Some(snapshot.build_bootstrap(
1453 &actor_id,
1454 &credential_state.credential().await,
1455 ))
1456 })
1457 });
1458 let cb = crate::lifecycle::hooks::build_hook_callback(
1459 self.hook_observer.clone(),
1460 ctx_builder,
1461 );
1462 ws_gate.set_hook_callback(cb);
1463 }
1464
1465 self.websocket_gate = Some(ws_gate);
1466 tracing::info!(
1467 "✅ WebSocketServer + WebSocketGate initialized (credential auth enabled)"
1468 );
1469 }
1470 Err(e) => {
1471 tracing::error!(
1472 "❌ Failed to bind WebSocket server on port {}: {:?}",
1473 listen_port,
1474 e
1475 );
1476 }
1477 }
1478 }
1479
1480 {
1489 let shutdown = self.shutdown_token.clone();
1490 let client = self.signaling_client.clone();
1491 let actor_id_for_heartbeat = actor_id.clone();
1492 let credential_state_for_heartbeat = credential_state.clone();
1493 let mailbox_for_heartbeat = self.mailbox.clone();
1494 let register_request_for_heartbeat = register_request.clone();
1495 let webrtc_coordinator_for_heartbeat = self.webrtc_coordinator.clone();
1496 let webrtc_gate_for_heartbeat = self.webrtc_gate.clone();
1497
1498 let heartbeat_interval_secs = register_ok.signaling_heartbeat_interval_secs;
1500 let heartbeat_interval = if heartbeat_interval_secs > 0 {
1501 Duration::from_secs(heartbeat_interval_secs as u64)
1502 } else {
1503 Duration::from_secs(30)
1504 };
1505 let ais_endpoint_for_heartbeat = self.config.ais_endpoint.clone();
1506 let realm_secret_for_heartbeat = self.config.realm_secret.clone();
1507 let heartbeat_handle = tokio::spawn(crate::lifecycle::heartbeat::heartbeat_task(
1508 shutdown,
1509 client,
1510 actor_id_for_heartbeat,
1511 credential_state_for_heartbeat,
1512 mailbox_for_heartbeat,
1513 heartbeat_interval,
1514 register_request_for_heartbeat,
1515 ais_endpoint_for_heartbeat,
1516 realm_secret_for_heartbeat,
1517 node_hook_callback.clone(),
1518 webrtc_coordinator_for_heartbeat,
1519 webrtc_gate_for_heartbeat,
1520 ));
1521 task_handles.push(heartbeat_handle);
1522 }
1523 tracing::info!(
1524 "✅ Heartbeat task started (interval: {}s)",
1525 register_ok.signaling_heartbeat_interval_secs
1526 );
1527
1528 if let Some(event_rx) = self.network_event_rx.take() {
1532 use crate::lifecycle::network_event::DefaultNetworkEventProcessor;
1533
1534 let event_processor =
1537 if let Some(config) = self.network_event_debounce_config.clone() {
1538 Arc::new(
1539 DefaultNetworkEventProcessor::new_with_debounce_and_peer_transport(
1540 self.signaling_client.clone(),
1541 self.webrtc_coordinator.clone(),
1542 config,
1543 self.peer_transport.clone(),
1544 ),
1545 )
1546 } else {
1547 Arc::new(DefaultNetworkEventProcessor::new_with_peer_transport(
1548 self.signaling_client.clone(),
1549 self.webrtc_coordinator.clone(),
1550 self.peer_transport.clone(),
1551 ))
1552 };
1553
1554 let shutdown = self.shutdown_token.clone();
1555 let network_event_handle = tokio::spawn(async move {
1556 Self::network_event_loop(event_rx, event_processor, shutdown).await;
1557 });
1558 task_handles.push(network_event_handle);
1559 tracing::info!("network_event.node.loop_started");
1560 } else {
1561 tracing::debug!("network_event.node.loop_not_started_no_handle");
1562 }
1563
1564 {
1565 let shutdown = self.shutdown_token.clone();
1576 let client = self.signaling_client.clone();
1577 let actor_id_for_unreg = actor_id.clone();
1578 let credential_state_for_unreg = credential_state.clone();
1579 let webrtc_coordinator = self.webrtc_coordinator.clone();
1580
1581 let unregister_handle = tokio::spawn(async move {
1582 shutdown.cancelled().await;
1584 tracing::info!(
1585 "📡 Shutdown signal received, sending UnregisterRequest for Actor {}",
1586 actor_id_for_unreg
1587 );
1588
1589 if let Some(coord) = webrtc_coordinator {
1591 if let Err(e) = coord.close_all_peers().await {
1592 tracing::warn!(
1593 "⚠️ Failed to close all WebRTC peers before UnregisterRequest: {}",
1594 e
1595 );
1596 } else {
1597 tracing::info!("✅ All WebRTC peers closed before UnregisterRequest");
1598 }
1599 } else {
1600 tracing::debug!(
1601 "WebRTC coordinator not found before UnregisterRequest (no WebRTC?)"
1602 );
1603 }
1604
1605 let result = tokio::time::timeout(
1607 Duration::from_secs(5),
1608 client.send_unregister_request(
1609 actor_id_for_unreg.clone(),
1610 credential_state_for_unreg.credential().await,
1611 Some("Graceful shutdown".to_string()),
1612 ),
1613 )
1614 .await;
1615 tracing::info!("UnregisterRequest result: {:?}", result);
1616 match result {
1617 Ok(Ok(_)) => {
1618 tracing::info!(
1619 "✅ UnregisterRequest sent to signaling server for Actor {}",
1620 actor_id_for_unreg
1621 );
1622 }
1623 Ok(Err(e)) => {
1624 tracing::warn!(
1625 "⚠️ Failed to send UnregisterRequest for Actor {}: {}",
1626 actor_id_for_unreg,
1627 e
1628 );
1629 }
1630 Err(_) => {
1631 tracing::warn!(
1632 "⚠️ UnregisterRequest timeout (5s) for Actor {}",
1633 actor_id_for_unreg
1634 );
1635 }
1636 }
1637 });
1638
1639 task_handles.push(unregister_handle);
1640 }
1641 } tracing::info!("✅ Transport layer initialized via WebRTC infrastructure");
1647
1648 let actor_id = self
1653 .actor_id
1654 .as_ref()
1655 .ok_or_else(|| ActrError::Internal("Actor ID not set".to_string()))?
1656 .clone();
1657 let bootstrap_ctx_builder = self.bootstrap_ctx_builder();
1661 let credential_state = self
1662 .credential_state
1663 .clone()
1664 .expect("CredentialState must be initialized in start()");
1665 let shutdown_token = self.shutdown_token.clone();
1666 let node_ref = Arc::new(self);
1667
1668 {
1672 let node = node_ref.clone();
1673 let actor_id = actor_id.clone();
1674 let credential_state = credential_state.clone();
1675 let shutdown = shutdown_token.clone();
1676 let on_stop_handle = tokio::spawn(async move {
1677 shutdown.cancelled().await;
1678 let stop_ctx = node
1679 .bootstrap_ctx_builder()
1680 .build_bootstrap(&actor_id, &credential_state.credential().await);
1681 let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_stop");
1682 let call_executor =
1683 lifecycle_host_abi(stop_ctx.clone(), node.workload_dispatch.clone());
1684 let mut workload = node.workload_dispatch.lock().await;
1685 if let Err(e) = crate::lifecycle::hooks::call_lifecycle_hook(
1686 "on_stop",
1687 workload.on_stop(stop_ctx, invocation, &call_executor),
1688 )
1689 .await
1690 {
1691 tracing::warn!(error = %e, "workload on_stop returned Err");
1692 }
1693 });
1694 task_handles.push(on_stop_handle);
1695 }
1696
1697 tracing::info!("🚀 Starting WebRTC background loops");
1701
1702 if let Some(coordinator) = &node_ref.webrtc_coordinator {
1704 coordinator.clone().start().await.map_err(|e| {
1705 ActrError::Unavailable(format!("WebRtcCoordinator start failed: {e}"))
1706 })?;
1707 tracing::info!("✅ WebRtcCoordinator signaling loop started");
1708 }
1709
1710 if let Some(gate) = &node_ref.webrtc_gate {
1712 gate.start_receive_loop(node_ref.mailbox.clone())
1713 .await
1714 .map_err(|e| {
1715 ActrError::Unavailable(format!("WebRtcGate receive loop start failed: {e}"))
1716 })?;
1717 tracing::info!("✅ WebRtcGate → Mailbox routing started");
1718 }
1719
1720 if let Some(ws_gate) = &node_ref.websocket_gate {
1722 ws_gate
1723 .start_receive_loop(node_ref.mailbox.clone())
1724 .await
1725 .map_err(|e| {
1726 ActrError::Unavailable(format!("WebSocketGate receive loop start failed: {e}"))
1727 })?;
1728 tracing::info!("✅ WebSocketGate → Mailbox routing started");
1729 }
1730 tracing::info!("✅ WebRTC background loops started");
1731
1732 if let Some(shell_to_workload) = &node_ref.shell_to_workload {
1736 tracing::info!("🔄 Starting Inproc receive loop (Shell → Guest)");
1737 if let Some(workload_to_shell) = &node_ref.workload_to_shell {
1739 let node = node_ref.clone();
1740 let request_rx_lane = shell_to_workload
1741 .get_lane(PayloadType::RpcReliable, None)
1742 .await
1743 .map_err(|e| {
1744 ActrError::Unavailable(format!("Failed to get guest receive lane: {e}"))
1745 })?;
1746 let response_tx = workload_to_shell.clone();
1747 let shutdown = shutdown_token.clone();
1748
1749 let inproc_handle = tokio::spawn(async move {
1750 loop {
1751 tokio::select! {
1752 _ = shutdown.cancelled() => {
1753 tracing::info!("📭 Guest receive loop (Shell → Guest) received shutdown signal");
1754 break;
1755 }
1756 envelope_result = request_rx_lane.recv_envelope() => {
1757 match envelope_result {
1758 Ok(envelope) => {
1759 let request_id = envelope.request_id.clone();
1760 tracing::debug!("📨 Guest received REQUEST from Shell: request_id={}", request_id);
1761 #[cfg(feature = "opentelemetry")]
1763 let span = {
1764 let actr_id_str = node.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default();
1765 let span = tracing::info_span!("ActrNode.lane_receive", actr_id = %actr_id_str, request_id = %request_id);
1766 set_parent_from_rpc_envelope(&span, &envelope);
1767 span
1768 };
1769
1770 let handle_incoming_fut = node.handle_incoming(envelope.clone(), None);
1772 #[cfg(feature = "opentelemetry")]
1773 let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
1774
1775 match handle_incoming_fut.await {
1776 Ok(response_bytes) => {
1777 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1780 let mut response_envelope = RpcEnvelope {
1781 route_key: envelope.route_key.clone(),
1782 payload: Some(response_bytes),
1783 error: None,
1784 traceparent: None,
1785 tracestate: None,
1786 request_id: request_id.clone(),
1787 metadata: Vec::new(),
1788 timeout_ms: 30000,
1789 };
1790 #[cfg(feature = "opentelemetry")]
1792 inject_span_context_to_rpc(&span, &mut response_envelope);
1793
1794 let send_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, response_envelope);
1796 #[cfg(feature = "opentelemetry")]
1797 let send_response_fut = send_response_fut.instrument(span.clone());
1798 if let Err(e) = send_response_fut.await {
1799 tracing::error!(
1800 severity = 7,
1801 error_category = "transport_error",
1802 request_id = %request_id,
1803 "❌ Failed to send RESPONSE to Shell: {:?}",
1804 e
1805 );
1806 }
1807 }
1808 Err(e) => {
1809 tracing::error!(
1810 severity = 6,
1811 error_category = "handler_error",
1812 request_id = %request_id,
1813 route_key = %envelope.route_key,
1814 "❌ Guest message handling failed: {:?}",
1815 e
1816 );
1817
1818 let error_response = actr_protocol::ErrorResponse {
1820 code: protocol_error_to_code(&e),
1821 message: e.to_string(),
1822 };
1823 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
1824 let mut error_envelope = RpcEnvelope {
1825 route_key: envelope.route_key.clone(),
1826 payload: None,
1827 error: Some(error_response),
1828 traceparent: envelope.traceparent.clone(),
1829 tracestate: envelope.tracestate.clone(),
1830 request_id: request_id.clone(),
1831 metadata: Vec::new(),
1832 timeout_ms: 30000,
1833 };
1834 #[cfg(feature = "opentelemetry")]
1836 inject_span_context_to_rpc(&span, &mut error_envelope);
1837
1838 let send_error_response_fut = response_tx.send_message(PayloadType::RpcReliable, None, error_envelope);
1839 #[cfg(feature = "opentelemetry")]
1840 let send_error_response_fut = send_error_response_fut.instrument(span);
1841 if let Err(send_err) = send_error_response_fut.await {
1842 tracing::error!(
1843 severity = 7,
1844 error_category = "transport_error",
1845 request_id = %request_id,
1846 "❌ Failed to send ERROR response to Shell: {:?}",
1847 send_err
1848 );
1849 }
1850 }
1851 }
1852 }
1853 Err(e) => {
1854 tracing::error!(
1855 severity = 8,
1856 error_category = "transport_error",
1857 "❌ Failed to receive from Shell → Guest lane: {:?}",
1858 e
1859 );
1860 break;
1861 }
1862 }
1863 }
1864 }
1865 }
1866 tracing::info!("✅ Guest receive loop (Shell → Guest) terminated gracefully");
1867 });
1868 task_handles.push(inproc_handle);
1869 }
1870 }
1871 tracing::info!("✅ Guest receive loop (Shell → Guest REQUEST) started");
1872
1873 tracing::info!("🔄 Starting Shell receive loop (Guest → Shell RESPONSE)");
1877 if let Some(workload_to_shell) = &node_ref.workload_to_shell {
1878 if let Some(shell_to_workload) = &node_ref.shell_to_workload {
1880 let response_rx_lane = workload_to_shell
1881 .get_lane(PayloadType::RpcReliable, None)
1882 .await
1883 .map_err(|e| {
1884 ActrError::Unavailable(format!("Failed to get shell receive lane: {e}"))
1885 })?;
1886 let request_mgr = shell_to_workload.clone();
1887 let shutdown = shutdown_token.clone();
1888
1889 let shell_receive_handle = tokio::spawn(async move {
1890 loop {
1891 tokio::select! {
1892 _ = shutdown.cancelled() => {
1893 tracing::info!("📭 Shell receive loop (Guest → Shell) received shutdown signal");
1894 break;
1895 }
1896 envelope_result = response_rx_lane.recv_envelope() => {
1897 match envelope_result {
1898 Ok(envelope) => {
1899 tracing::debug!(
1900 "📨 Shell received RESPONSE from Guest: request_id={}",
1901 envelope.request_id
1902 );
1903
1904 match (envelope.payload, envelope.error) {
1906 (Some(payload), None) => {
1907 if let Err(e) = request_mgr
1909 .complete_response(&envelope.request_id, payload)
1910 .await
1911 {
1912 tracing::warn!(
1913 severity = 4,
1914 error_category = "orphan_response",
1915 request_id = %envelope.request_id,
1916 "⚠️ No pending request found for response: {:?}",
1917 e
1918 );
1919 }
1920 }
1921 (None, Some(error)) => {
1922 let actr_err = ActrError::Unavailable(format!("RPC error {}: {}", error.code, error.message));
1924 if let Err(e) = request_mgr
1925 .complete_error(&envelope.request_id, actr_err)
1926 .await
1927 {
1928 tracing::warn!(
1929 severity = 4,
1930 error_category = "orphan_response",
1931 request_id = %envelope.request_id,
1932 "⚠️ No pending request found for error response: {:?}",
1933 e
1934 );
1935 }
1936 }
1937 _ => {
1938 tracing::error!(
1939 severity = 7,
1940 error_category = "protocol_error",
1941 request_id = %envelope.request_id,
1942 "❌ Invalid RpcEnvelope: both payload and error are present or both absent"
1943 );
1944 }
1945 }
1946 }
1947 Err(e) => {
1948 tracing::error!(
1949 severity = 8,
1950 error_category = "transport_error",
1951 "❌ Failed to receive from Guest → Shell lane: {:?}",
1952 e
1953 );
1954 break;
1955 }
1956 }
1957 }
1958 }
1959 }
1960 tracing::info!("✅ Shell receive loop (Guest → Shell) terminated gracefully");
1961 });
1962 task_handles.push(shell_receive_handle);
1963 }
1964 }
1965 tracing::info!("✅ Shell receive loop (Guest → Shell RESPONSE) started");
1966
1967 let backpressure_threshold = node_ref.mailbox_backpressure_threshold;
1982 {
1983 use std::sync::atomic::{AtomicBool, Ordering};
1984 let mailbox = node_ref.mailbox.clone();
1985 let shutdown = shutdown_token.clone();
1986 let hook_cb = node_hook_callback.clone();
1987 let triggered = Arc::new(AtomicBool::new(false));
1988
1989 let fire_if_rising = {
1992 let triggered = triggered.clone();
1993 let hook_cb = hook_cb.clone();
1994 Arc::new(move |queue_len: usize| {
1995 if queue_len >= backpressure_threshold {
1996 if !triggered.swap(true, Ordering::AcqRel) {
1997 if let Some(cb) = hook_cb.as_ref() {
1998 let cb = cb.clone();
1999 tokio::spawn(async move {
2000 cb(crate::wire::webrtc::HookEvent::MailboxBackpressure {
2001 queue_len,
2002 threshold: backpressure_threshold,
2003 })
2004 .await;
2005 });
2006 } else {
2007 tracing::warn!(
2008 queue_len,
2009 threshold = backpressure_threshold,
2010 "mailbox backpressure",
2011 );
2012 }
2013 }
2014 } else if triggered.swap(false, Ordering::AcqRel) {
2015 tracing::info!(
2016 queue_len,
2017 threshold = backpressure_threshold,
2018 "mailbox backpressure cleared",
2019 );
2020 }
2021 })
2022 };
2023
2024 struct EnqueueObserver {
2028 fire: Arc<dyn Fn(usize) + Send + Sync + 'static>,
2029 }
2030 impl actr_runtime_mailbox::MailboxDepthObserver for EnqueueObserver {
2031 fn on_depth_change(&self, queued_messages: usize) {
2032 (self.fire)(queued_messages);
2033 }
2034 }
2035
2036 let installed = {
2037 let observer: Arc<dyn actr_runtime_mailbox::MailboxDepthObserver> =
2038 Arc::new(EnqueueObserver {
2039 fire: fire_if_rising.clone(),
2040 });
2041 mailbox.set_depth_observer(observer)
2042 };
2043
2044 if installed {
2045 tracing::debug!("mailbox backpressure watchdog: push notifications enabled");
2046 } else {
2047 tracing::debug!(
2048 "mailbox backpressure watchdog: backend does not support push, falling back to 1 Hz polling"
2049 );
2050 let mailbox_for_poll = mailbox.clone();
2051 let shutdown_for_poll = shutdown.clone();
2052 let fire_for_poll = fire_if_rising.clone();
2053 let watchdog_handle = tokio::spawn(async move {
2054 let mut ticker = tokio::time::interval(Duration::from_secs(1));
2055 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
2056 loop {
2057 tokio::select! {
2058 _ = shutdown_for_poll.cancelled() => {
2059 tracing::debug!(
2060 "mailbox backpressure watchdog shutting down"
2061 );
2062 break;
2063 }
2064 _ = ticker.tick() => {
2065 let status = match mailbox_for_poll.status().await {
2066 Ok(s) => s,
2067 Err(e) => {
2068 tracing::debug!(?e, "mailbox status poll failed");
2069 continue;
2070 }
2071 };
2072 fire_for_poll(status.queued_messages as usize);
2073 }
2074 }
2075 }
2076 });
2077 task_handles.push(watchdog_handle);
2078 }
2079 }
2080
2081 tracing::info!("🔄 Starting Mailbox processing loop (State Path)");
2085 {
2086 let node = node_ref.clone();
2087 let mailbox = node_ref.mailbox.clone();
2088 let gate = node_ref.webrtc_gate.clone();
2089 let shutdown = shutdown_token.clone();
2090
2091 let mailbox_handle = tokio::spawn(async move {
2092 loop {
2093 tokio::select! {
2094 _ = shutdown.cancelled() => {
2096 tracing::info!("📭 Mailbox loop received shutdown signal");
2097 break;
2098 }
2099 result = mailbox.dequeue() => {
2101 match result {
2102 Ok(messages) => {
2103 if messages.is_empty() {
2104 tokio::time::sleep(Duration::from_millis(10)).await;
2106 continue;
2107 }
2108 tracing::debug!("📬 Mailbox dequeue: {} messages", messages.len());
2109
2110 for msg_record in messages {
2112 match RpcEnvelope::decode(&msg_record.payload[..]) {
2114 Ok(envelope) => {
2115 let request_id = envelope.request_id.clone();
2116 let queue_latency_ms = (chrono::Utc::now() - msg_record.created_at).num_milliseconds();
2117 tracing::info!(request_id = %request_id, queue_latency_ms = queue_latency_ms, "rpc.mailbox.dequeued");
2118
2119 tracing::debug!("📦 Processing message: request_id={}", request_id);
2120 #[cfg(feature = "opentelemetry")]
2121 let span = {
2122 let actr_id_str = node.actor_id.as_ref().map(|id| id.to_string()).unwrap_or_default();
2123 let span = tracing::info_span!("ActrNode.mailbox_receive", actr_id = %actr_id_str, request_id = %request_id, queue_wait_ms = queue_latency_ms);
2124 set_parent_from_rpc_envelope(&span, &envelope);
2125 span
2126 };
2127
2128 let caller_id_result = ActrId::decode(&msg_record.from[..]);
2130 let caller_id_ref = caller_id_result.as_ref().ok();
2131
2132 if caller_id_ref.is_none() {
2133 tracing::warn!(
2134 request_id = %request_id,
2135 "⚠️ Failed to decode caller_id from MessageRecord.from"
2136 );
2137 }
2138
2139 let handle_incoming_fut = node.handle_incoming(envelope.clone(), caller_id_ref);
2141 #[cfg(feature = "opentelemetry")]
2142 let handle_incoming_fut = handle_incoming_fut.instrument(span.clone());
2143
2144 match handle_incoming_fut.await {
2145 Ok(response_bytes) => {
2146 if let Some(ref gate) = gate {
2148 match caller_id_result {
2150 Ok(caller) => {
2151 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
2153 let mut response_envelope = RpcEnvelope {
2154 request_id, route_key: envelope.route_key.clone(),
2156 payload: Some(response_bytes),
2157 error: None,
2158 traceparent: envelope.traceparent.clone(),
2159 tracestate: envelope.tracestate.clone(),
2160 metadata: Vec::new(), timeout_ms: 30000,
2162 };
2163 #[cfg(feature = "opentelemetry")]
2165 inject_span_context_to_rpc(&span, &mut response_envelope);
2166
2167 let send_response_fut = gate.send_response(&caller, response_envelope);
2168 #[cfg(feature = "opentelemetry")]
2169 let send_response_fut = send_response_fut.instrument(span);
2170 if let Err(e) = send_response_fut.await {
2171 tracing::error!(
2172 severity = 7,
2173 error_category = "transport_error",
2174 request_id = %envelope.request_id,
2175 "❌ Failed to send response: {:?}",
2176 e
2177 );
2178 }
2179 }
2180 Err(e) => {
2181 tracing::error!(
2182 severity = 8,
2183 error_category = "protobuf_decode",
2184 request_id = %envelope.request_id,
2185 "❌ Failed to decode caller_id: {:?}",
2186 e
2187 );
2188 }
2189 }
2190 }
2191
2192 if let Err(e) = mailbox.ack(msg_record.id).await {
2194 tracing::error!(
2195 severity = 9,
2196 error_category = "mailbox_error",
2197 request_id = %envelope.request_id,
2198 message_id = %msg_record.id,
2199 "❌ Mailbox ACK failed: {:?}",
2200 e
2201 );
2202 }
2203 }
2204 Err(e) => {
2205 tracing::error!(
2206 severity = 6,
2207 error_category = "handler_error",
2208 request_id = %envelope.request_id,
2209 route_key = %envelope.route_key,
2210 "❌ handle_incoming failed: {:?}", e
2211 );
2212 let _ = mailbox.ack(msg_record.id).await;
2215 }
2216 }
2217 }
2218 Err(e) => {
2219 tracing::error!(
2221 severity = 9,
2222 error_category = "protobuf_decode",
2223 message_id = %msg_record.id,
2224 "❌ Poison message: Failed to deserialize RpcEnvelope: {:?}",
2225 e
2226 );
2227
2228 use actr_runtime_mailbox::DlqRecord;
2230 use chrono::Utc;
2231 use uuid::Uuid;
2232
2233 let dlq_record = DlqRecord {
2234 id: Uuid::new_v4(),
2235 original_message_id: Some(msg_record.id.to_string()),
2236 from: Some(msg_record.from.clone()),
2237 to: node.actor_id.as_ref().map(|id| {
2238 let mut buf = Vec::new();
2239 id.encode(&mut buf).unwrap();
2240 buf
2241 }),
2242 raw_bytes: msg_record.payload.clone(),
2243 error_message: format!("Protobuf decode failed: {e}"),
2244 error_category: "protobuf_decode".to_string(),
2245 trace_id: format!("mailbox-{}", msg_record.id),
2246 request_id: None,
2247 created_at: Utc::now(),
2248 redrive_attempts: 0,
2249 last_redrive_at: None,
2250 context: Some(format!(
2251 r#"{{"source":"mailbox","priority":"{}"}}"#,
2252 match msg_record.priority {
2253 actr_runtime_mailbox::MessagePriority::High => "high",
2254 actr_runtime_mailbox::MessagePriority::Normal => "normal",
2255 }
2256 )),
2257 };
2258
2259 if let Err(dlq_err) = node.dlq.enqueue(dlq_record).await {
2260 tracing::error!(
2261 severity = 10,
2262 "❌ CRITICAL: Failed to write poison message to DLQ: {:?}",
2263 dlq_err
2264 );
2265 } else {
2266 tracing::warn!(
2267 severity = 9,
2268 "☠️ Poison message moved to DLQ: message_id={}",
2269 msg_record.id
2270 );
2271 }
2272
2273 let _ = mailbox.ack(msg_record.id).await;
2275 }
2276 }
2277 }
2278 }
2279 Err(e) => {
2280 tracing::error!(
2281 severity = 9,
2282 error_category = "mailbox_error",
2283 "❌ Mailbox dequeue failed: {:?}", e
2284 );
2285 tokio::time::sleep(Duration::from_secs(1)).await;
2286 }
2287 }
2288 }
2289 }
2290 }
2291 tracing::info!("✅ Mailbox processing loop terminated gracefully");
2292 });
2293
2294 task_handles.push(mailbox_handle);
2295 }
2296 tracing::info!("✅ Mailbox processing loop started");
2297 tracing::info!("✅ ActrNode started successfully");
2298
2299 {
2300 let ready_ctx = bootstrap_ctx_builder
2301 .build_bootstrap(&actor_id, &credential_state.credential().await);
2302 let invocation = lifecycle_invocation(&actor_id, "lifecycle:on_ready");
2303 let call_executor =
2304 lifecycle_host_abi(ready_ctx.clone(), node_ref.workload_dispatch.clone());
2305 let mut workload = node_ref.workload_dispatch.lock().await;
2306 if let Err(e) = crate::lifecycle::hooks::call_lifecycle_hook(
2307 "on_ready",
2308 workload.on_ready(ready_ctx, invocation, &call_executor),
2309 )
2310 .await
2311 {
2312 tracing::warn!(error = %e, "workload on_ready returned Err");
2313 }
2314 }
2315
2316 let shared = Arc::new(ActrRefShared {
2318 actor_id,
2319 bootstrap_ctx_builder,
2320 credential_state,
2321 shutdown_token,
2322 task_handles: Mutex::new(task_handles),
2323 });
2324
2325 tracing::info!("✅ ActrRef created (Shell → Guest communication handle)");
2327
2328 Ok(ActrRef { shared })
2329 }
2330}