1use crate::inbound::{DataStreamRegistry, MediaFrameRegistry};
6use crate::outbound::Gate;
7use crate::wire::webrtc::SignalingClient;
8#[cfg(feature = "opentelemetry")]
9use crate::wire::webrtc::trace::inject_span_context_to_rpc;
10use actr_config::lock::LockFile;
11use actr_framework::{Bytes, Context, DataStream, Dest, MediaSample};
12use actr_protocol::{
13 AIdCredential, ActorResult, ActrError, ActrId, ActrType, PayloadType, RouteCandidatesRequest,
14 RpcEnvelope, RpcRequest, route_candidates_request,
15};
16use async_trait::async_trait;
17use futures_util::future::BoxFuture;
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::RwLock;
21
22#[derive(Clone)]
37pub struct RuntimeContext {
38 self_id: ActrId,
39 caller_id: Option<ActrId>,
40 request_id: String,
41 inproc_gate: Gate, outproc_gate: Option<Gate>, data_stream_registry: Arc<DataStreamRegistry>, media_frame_registry: Arc<MediaFrameRegistry>, signaling_client: Arc<dyn SignalingClient>,
46 credential: AIdCredential,
47 actr_lock: Option<Arc<LockFile>>, discovered_ws_addresses: Arc<RwLock<HashMap<ActrId, String>>>,
52}
53
54impl RuntimeContext {
55 #[allow(clippy::too_many_arguments)] pub(crate) fn new(
72 self_id: ActrId,
73 caller_id: Option<ActrId>,
74 request_id: String,
75 inproc_gate: Gate,
76 outproc_gate: Option<Gate>,
77 data_stream_registry: Arc<DataStreamRegistry>,
78 media_frame_registry: Arc<MediaFrameRegistry>,
79 signaling_client: Arc<dyn SignalingClient>,
80 credential: AIdCredential,
81 actr_lock: Option<Arc<LockFile>>,
82 discovered_ws_addresses: Arc<RwLock<HashMap<ActrId, String>>>,
83 ) -> Self {
84 Self {
85 self_id,
86 caller_id,
87 request_id,
88 inproc_gate,
89 outproc_gate,
90 data_stream_registry,
91 media_frame_registry,
92 signaling_client,
93 credential,
94 actr_lock,
95 discovered_ws_addresses,
96 }
97 }
98
99 #[inline]
105 fn select_gate(&self, dest: &Dest) -> ActorResult<&Gate> {
106 match dest {
107 Dest::Shell | Dest::Local => Ok(&self.inproc_gate),
108 Dest::Actor(_) => self.outproc_gate.as_ref().ok_or_else(|| {
109 ActrError::Internal(
110 "PeerGate not initialized yet (WebRTC setup in progress)".to_string(),
111 )
112 }),
113 }
114 }
115
116 #[inline]
122 fn extract_target_id<'a>(&'a self, dest: &'a Dest) -> &'a ActrId {
123 match dest {
124 Dest::Shell | Dest::Local => &self.self_id,
125 Dest::Actor(id) => id,
126 }
127 }
128
129 #[cfg_attr(
131 feature = "opentelemetry",
132 tracing::instrument(
133 skip_all,
134 name = "RuntimeContext.call_raw",
135 fields(
136 actr_id = %self.self_id,
137 route_key = %route_key,
138 )
139 )
140 )]
141 pub async fn call_raw(
142 &self,
143 target: &Dest,
144 route_key: String,
145 payload_type: PayloadType,
146 payload: Bytes,
147 timeout_ms: i64,
148 ) -> ActorResult<Bytes> {
149 #[cfg(feature = "opentelemetry")]
150 use crate::wire::webrtc::trace::inject_span_context_to_rpc;
151
152 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
153 let mut envelope = RpcEnvelope {
154 route_key,
155 payload: Some(payload),
156 error: None,
157 traceparent: None,
158 tracestate: None,
159 request_id: uuid::Uuid::new_v4().to_string(),
160 metadata: vec![],
161 timeout_ms,
162 };
163 #[cfg(feature = "opentelemetry")]
164 inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
165
166 let gate = self.select_gate(target)?;
167 let target_id = self.extract_target_id(target);
168 gate.send_request_with_type(target_id, payload_type, envelope)
169 .await
170 }
171
172 #[cfg_attr(
174 feature = "opentelemetry",
175 tracing::instrument(
176 skip_all,
177 name = "RuntimeContext.tell_raw",
178 fields(
179 actr_id = %self.self_id,
180 route_key = %route_key,
181 )
182 )
183 )]
184 pub async fn tell_raw(
185 &self,
186 target: &Dest,
187 route_key: String,
188 payload_type: PayloadType,
189 payload: Bytes,
190 ) -> ActorResult<()> {
191 #[cfg(feature = "opentelemetry")]
192 use crate::wire::webrtc::trace::inject_span_context_to_rpc;
193
194 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
195 let mut envelope = RpcEnvelope {
196 route_key,
197 payload: Some(payload),
198 error: None,
199 traceparent: None,
200 tracestate: None,
201 request_id: uuid::Uuid::new_v4().to_string(),
202 metadata: vec![],
203 timeout_ms: 0,
204 };
205 #[cfg(feature = "opentelemetry")]
206 inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
207
208 let gate = self.select_gate(target)?;
209 let target_id = self.extract_target_id(target);
210 gate.send_message_with_type(target_id, payload_type, envelope)
211 .await
212 }
213
214 pub async fn send_data_stream_with_type(
219 &self,
220 target: &Dest,
221 payload_type: actr_protocol::PayloadType,
222 chunk: DataStream,
223 ) -> ActorResult<()> {
224 use actr_protocol::prost::Message as ProstMessage;
225
226 let payload = chunk.encode_to_vec();
227 let stream_id = chunk.stream_id.as_str();
228
229 let gate = self.select_gate(target)?;
230 let target_id = self.extract_target_id(target);
231
232 gate.send_data_stream(
233 target_id,
234 payload_type,
235 stream_id,
236 bytes::Bytes::from(payload),
237 )
238 .await
239 }
240
241 fn get_dependency_fingerprint(&self, target_type: &ActrType) -> Option<String> {
243 let actr_lock = self.actr_lock.as_ref()?;
244
245 let key = target_type.to_string_repr();
246
247 if let Some(dep) = actr_lock.get_dependency(&key) {
249 return Some(dep.fingerprint.clone());
250 }
251
252 for dep in &actr_lock.dependencies {
254 if Self::matches_dependency_actr_type(&dep.actr_type, target_type) {
255 return Some(dep.fingerprint.clone());
256 }
257 }
258
259 None
260 }
261
262 fn matches_dependency_actr_type(raw: &str, target_type: &ActrType) -> bool {
263 let Ok(dep_type) = ActrType::from_string_repr(raw) else {
264 return false;
265 };
266
267 dep_type == *target_type
268 }
269
270 async fn send_discovery_request(
272 &self,
273 target_type: &ActrType,
274 candidate_count: u32,
275 client_fingerprint: String,
276 ) -> ActorResult<InternalDiscoveryResult> {
277 let criteria = route_candidates_request::NodeSelectionCriteria {
278 candidate_count,
279 ranking_factors: Vec::new(),
280 minimal_dependency_requirement: None,
281 minimal_health_requirement: None,
282 };
283
284 let request = RouteCandidatesRequest {
285 target_type: target_type.clone(),
286 criteria: Some(criteria),
287 client_location: None,
288 client_fingerprint,
289 };
290
291 let response = self
292 .signaling_client
293 .send_route_candidates_request(self.self_id.clone(), self.credential.clone(), request)
294 .await
295 .map_err(|e| ActrError::Unavailable(format!("Route candidates request failed: {e}")))?;
296
297 match response.result {
298 Some(actr_protocol::route_candidates_response::Result::Success(success)) => {
299 Ok(InternalDiscoveryResult {
300 candidates: success.candidates,
301 ws_address_map: success.ws_address_map,
302 })
303 }
304 Some(actr_protocol::route_candidates_response::Result::Error(err)) => {
305 Err(ActrError::Unavailable(format!(
306 "Route candidates error {}: {}",
307 err.code, err.message
308 )))
309 }
310 None => Err(ActrError::Unavailable(
311 "Invalid route candidates response: missing result".to_string(),
312 )),
313 }
314 }
315}
316
317struct InternalDiscoveryResult {
319 candidates: Vec<ActrId>,
320 ws_address_map: Vec<actr_protocol::WsAddressEntry>,
322}
323
324#[derive(Clone)]
341pub(crate) struct BootstrapContextBuilder {
342 inproc_gate: Gate,
343 outproc_gate: Option<Gate>,
344 data_stream_registry: Arc<DataStreamRegistry>,
345 media_frame_registry: Arc<MediaFrameRegistry>,
346 signaling_client: Arc<dyn SignalingClient>,
347 actr_lock: Option<Arc<LockFile>>,
348 discovered_ws_addresses: Arc<RwLock<HashMap<ActrId, String>>>,
351}
352
353impl BootstrapContextBuilder {
354 #[allow(clippy::too_many_arguments)]
359 pub(crate) fn new(
360 inproc_gate: Gate,
361 outproc_gate: Option<Gate>,
362 data_stream_registry: Arc<DataStreamRegistry>,
363 media_frame_registry: Arc<MediaFrameRegistry>,
364 signaling_client: Arc<dyn SignalingClient>,
365 actr_lock: Option<Arc<LockFile>>,
366 discovered_ws_addresses: Arc<RwLock<HashMap<ActrId, String>>>,
367 ) -> Self {
368 Self {
369 inproc_gate,
370 outproc_gate,
371 data_stream_registry,
372 media_frame_registry,
373 signaling_client,
374 actr_lock,
375 discovered_ws_addresses,
376 }
377 }
378
379 pub(crate) fn build_bootstrap(
386 &self,
387 self_id: &ActrId,
388 credential: &AIdCredential,
389 ) -> RuntimeContext {
390 RuntimeContext::new(
391 self_id.clone(),
392 None,
393 uuid::Uuid::new_v4().to_string(),
394 self.inproc_gate.clone(),
395 self.outproc_gate.clone(),
396 self.data_stream_registry.clone(),
397 self.media_frame_registry.clone(),
398 self.signaling_client.clone(),
399 credential.clone(),
400 self.actr_lock.clone(),
401 self.discovered_ws_addresses.clone(),
402 )
403 }
404}
405
406#[async_trait]
407impl Context for RuntimeContext {
408 fn self_id(&self) -> &ActrId {
411 &self.self_id
412 }
413
414 fn caller_id(&self) -> Option<&ActrId> {
415 self.caller_id.as_ref()
416 }
417
418 fn request_id(&self) -> &str {
419 &self.request_id
420 }
421
422 #[cfg_attr(
424 feature = "opentelemetry",
425 tracing::instrument(
426 skip_all,
427 name = "RuntimeContext.call",
428 fields(actr_id = %self.self_id)
429 )
430 )]
431 async fn call<R: RpcRequest>(&self, target: &Dest, request: R) -> ActorResult<R::Response> {
432 use actr_protocol::prost::Message as ProstMessage;
433
434 let payload: Bytes = request.encode_to_vec().into();
436
437 let route_key = R::route_key().to_string();
439
440 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
442 let mut envelope = RpcEnvelope {
443 route_key,
444 payload: Some(payload),
445 error: None,
446 traceparent: None,
447 tracestate: None,
448 request_id: uuid::Uuid::new_v4().to_string(), metadata: vec![],
450 timeout_ms: 30000, };
452 #[cfg(feature = "opentelemetry")]
454 inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
455
456 let gate = self.select_gate(target)?;
458 let target_id = self.extract_target_id(target);
459
460 let response_bytes = gate
463 .send_request_with_type(target_id, R::payload_type(), envelope)
464 .await?;
465
466 R::Response::decode(&*response_bytes).map_err(|e| {
468 ActrError::DecodeFailure(format!(
469 "Failed to decode {}: {}",
470 std::any::type_name::<R::Response>(),
471 e
472 ))
473 })
474 }
475
476 #[cfg_attr(
477 feature = "opentelemetry",
478 tracing::instrument(
479 skip_all,
480 name = "RuntimeContext.tell",
481 fields(actr_id = %self.self_id)
482 )
483 )]
484 async fn tell<R: RpcRequest>(&self, target: &Dest, message: R) -> ActorResult<()> {
485 let payload: Bytes = message.encode_to_vec().into();
487
488 let route_key = R::route_key().to_string();
490
491 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
493 let mut envelope = RpcEnvelope {
494 route_key,
495 payload: Some(payload),
496 error: None,
497 traceparent: None,
498 tracestate: None,
499 request_id: uuid::Uuid::new_v4().to_string(),
500 metadata: vec![],
501 timeout_ms: 0, };
503 #[cfg(feature = "opentelemetry")]
505 inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
506
507 let gate = self.select_gate(target)?;
509 let target_id = self.extract_target_id(target);
510
511 gate.send_message_with_type(target_id, R::payload_type(), envelope)
513 .await
514 }
515
516 async fn register_stream<F>(&self, stream_id: String, callback: F) -> ActorResult<()>
519 where
520 F: Fn(DataStream, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static,
521 {
522 tracing::debug!(
523 "📊 Registering DataStream callback for stream_id: {}",
524 stream_id
525 );
526 self.data_stream_registry
527 .register(stream_id, Arc::new(callback));
528 Ok(())
529 }
530
531 async fn unregister_stream(&self, stream_id: &str) -> ActorResult<()> {
532 tracing::debug!(
533 "🚫 Unregistering DataStream callback for stream_id: {}",
534 stream_id
535 );
536 self.data_stream_registry.unregister(stream_id);
537 Ok(())
538 }
539
540 async fn send_data_stream(
541 &self,
542 target: &Dest,
543 chunk: DataStream,
544 payload_type: actr_protocol::PayloadType,
545 ) -> ActorResult<()> {
546 use actr_protocol::prost::Message as ProstMessage;
547
548 let payload = chunk.encode_to_vec();
550 let stream_id = chunk.stream_id.as_str();
551
552 tracing::debug!(
553 "📤 Sending DataStream: stream_id={}, sequence={}, size={} bytes",
554 stream_id,
555 chunk.sequence,
556 payload.len()
557 );
558
559 let gate = self.select_gate(target)?;
561 let target_id = self.extract_target_id(target);
562
563 gate.send_data_stream(
565 target_id,
566 payload_type,
567 stream_id,
568 bytes::Bytes::from(payload),
569 )
570 .await
571 }
572
573 #[cfg_attr(
574 feature = "opentelemetry",
575 tracing::instrument(
576 skip_all,
577 name = "RuntimeContext.discover_route_candidate",
578 fields(
579 actr_id = %self.self_id,
580 target_type = %target_type,
581 )
582 )
583 )]
584 async fn discover_route_candidate(&self, target_type: &ActrType) -> ActorResult<ActrId> {
585 if !self.signaling_client.is_connected() {
586 return Err(ActrError::Unavailable(
587 "Signaling client is not connected.".to_string(),
588 ));
589 }
590
591 let service_name = format!("{}:{}", target_type.manufacturer, target_type.name);
592
593 let client_fingerprint = match self.get_dependency_fingerprint(target_type) {
597 Some(fingerprint) => fingerprint,
598 None => {
599 if self.actr_lock.is_none() {
600 tracing::debug!(
601 "manifest.lock.toml not loaded; sending discovery without fingerprint for '{}'",
602 service_name
603 );
604 String::new()
605 } else {
606 tracing::error!(
607 severity = 10,
608 error_category = "dependency_missing",
609 "❌ DEPENDENCY NOT FOUND: Service '{}' is not declared in manifest.lock.toml.\n\
610 Please run 'actr deps install' to generate the lock file with all dependencies.",
611 service_name
612 );
613 return Err(ActrError::DependencyNotFound {
614 service_name: service_name.clone(),
615 message: format!(
616 "Dependency '{}' not found in manifest.lock.toml. Run 'actr deps install' to resolve dependencies.",
617 service_name
618 ),
619 });
620 }
621 }
622 };
623
624 if !client_fingerprint.is_empty() {
625 tracing::debug!(
626 "📋 Found dependency fingerprint for '{}': {}",
627 service_name,
628 &client_fingerprint[..20.min(client_fingerprint.len())]
629 );
630 }
631
632 let result = self
636 .send_discovery_request(target_type, 1, client_fingerprint)
637 .await?;
638
639 tracing::info!(
640 "Discovery result [{}]: {} candidates, {} ws_address entries",
641 service_name,
642 result.candidates.len(),
643 result.ws_address_map.len(),
644 );
645
646 if !result.ws_address_map.is_empty() {
650 let mut map = self.discovered_ws_addresses.write().await;
651 for entry in result.ws_address_map {
652 if let Some(url) = entry.ws_address {
653 tracing::debug!(
654 actor_id = ?entry.candidate_id,
655 ws_url = %url,
656 "discovered direct WebSocket address",
657 );
658 map.insert(entry.candidate_id, url);
659 }
660 }
661 }
662
663 result.candidates.into_iter().next().ok_or_else(|| {
664 ActrError::NotFound(format!(
665 "No route candidates for type {}/{}",
666 target_type.manufacturer, target_type.name
667 ))
668 })
669 }
670
671 async fn call_raw(
672 &self,
673 target: &ActrId,
674 route_key: &str,
675 payload: Bytes,
676 ) -> ActorResult<Bytes> {
677 RuntimeContext::call_raw(
682 self,
683 &Dest::Actor(target.clone()),
684 route_key.to_string(),
685 PayloadType::RpcReliable,
686 payload,
687 30_000,
688 )
689 .await
690 }
691
692 async fn register_media_track<F>(&self, track_id: String, callback: F) -> ActorResult<()>
695 where
696 F: Fn(MediaSample, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static,
697 {
698 tracing::debug!(
699 "📹 Registering MediaTrack callback for track_id: {}",
700 track_id
701 );
702 self.media_frame_registry
703 .register(track_id, Arc::new(callback));
704 Ok(())
705 }
706
707 async fn unregister_media_track(&self, track_id: &str) -> ActorResult<()> {
708 tracing::debug!(
709 "📹 Unregistering MediaTrack callback for track_id: {}",
710 track_id
711 );
712 self.media_frame_registry.unregister(track_id);
713 Ok(())
714 }
715
716 async fn send_media_sample(
717 &self,
718 target: &Dest,
719 track_id: &str,
720 sample: MediaSample,
721 ) -> ActorResult<()> {
722 let gate = self.select_gate(target)?;
724
725 let target_id = self.extract_target_id(target);
727
728 gate.send_media_sample(target_id, track_id, sample).await
730 }
731
732 async fn add_media_track(
733 &self,
734 target: &Dest,
735 track_id: &str,
736 codec: &str,
737 media_type: &str,
738 ) -> ActorResult<()> {
739 let gate = self.select_gate(target)?;
740 let target_id = self.extract_target_id(target);
741 gate.add_media_track(target_id, track_id, codec, media_type)
742 .await
743 }
744
745 async fn remove_media_track(&self, target: &Dest, track_id: &str) -> ActorResult<()> {
746 let gate = self.select_gate(target)?;
747 let target_id = self.extract_target_id(target);
748 gate.remove_media_track(target_id, track_id).await
749 }
750}