1use crate::inbound::{DataStreamRegistry, MediaFrameRegistry};
6use crate::outbound::Gate;
7use crate::wire::webrtc::SignalingClient;
8#[cfg(feature = "opentelemetry")]
9use crate::wire::webrtc::trace::inject_span_context_to_rpc;
10use actr_config::lock::LockFile;
11use actr_framework::{Bytes, Context, DataStream, Dest, MediaSample};
12use actr_protocol::{
13 AIdCredential, ActorResult, ActrError, ActrId, ActrType, PayloadType, RouteCandidatesRequest,
14 RpcEnvelope, RpcRequest, route_candidates_request,
15};
16use async_trait::async_trait;
17use futures_util::future::BoxFuture;
18use std::sync::Arc;
19
20#[derive(Clone)]
35pub struct RuntimeContext {
36 self_id: ActrId,
37 caller_id: Option<ActrId>,
38 request_id: String,
39 inproc_gate: Gate, outproc_gate: Option<Gate>, data_stream_registry: Arc<DataStreamRegistry>, media_frame_registry: Arc<MediaFrameRegistry>, signaling_client: Arc<dyn SignalingClient>,
44 credential: AIdCredential,
45 actr_lock: Option<Arc<LockFile>>, }
47
48impl RuntimeContext {
49 #[allow(clippy::too_many_arguments)] pub(crate) fn new(
65 self_id: ActrId,
66 caller_id: Option<ActrId>,
67 request_id: String,
68 inproc_gate: Gate,
69 outproc_gate: Option<Gate>,
70 data_stream_registry: Arc<DataStreamRegistry>,
71 media_frame_registry: Arc<MediaFrameRegistry>,
72 signaling_client: Arc<dyn SignalingClient>,
73 credential: AIdCredential,
74 actr_lock: Option<Arc<LockFile>>,
75 ) -> Self {
76 Self {
77 self_id,
78 caller_id,
79 request_id,
80 inproc_gate,
81 outproc_gate,
82 data_stream_registry,
83 media_frame_registry,
84 signaling_client,
85 credential,
86 actr_lock,
87 }
88 }
89
90 #[inline]
96 fn select_gate(&self, dest: &Dest) -> ActorResult<&Gate> {
97 match dest {
98 Dest::Shell | Dest::Local => Ok(&self.inproc_gate),
99 Dest::Actor(_) => self.outproc_gate.as_ref().ok_or_else(|| {
100 ActrError::Internal(
101 "PeerGate not initialized yet (WebRTC setup in progress)".to_string(),
102 )
103 }),
104 }
105 }
106
107 #[inline]
113 fn extract_target_id<'a>(&'a self, dest: &'a Dest) -> &'a ActrId {
114 match dest {
115 Dest::Shell | Dest::Local => &self.self_id,
116 Dest::Actor(id) => id,
117 }
118 }
119
120 #[cfg_attr(
122 feature = "opentelemetry",
123 tracing::instrument(
124 skip_all,
125 name = "RuntimeContext.call_raw",
126 fields(
127 actr_id = %self.self_id,
128 route_key = %route_key,
129 )
130 )
131 )]
132 pub async fn call_raw(
133 &self,
134 target: &Dest,
135 route_key: String,
136 payload_type: PayloadType,
137 payload: Bytes,
138 timeout_ms: i64,
139 ) -> ActorResult<Bytes> {
140 #[cfg(feature = "opentelemetry")]
141 use crate::wire::webrtc::trace::inject_span_context_to_rpc;
142
143 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
144 let mut envelope = RpcEnvelope {
145 route_key,
146 payload: Some(payload),
147 error: None,
148 traceparent: None,
149 tracestate: None,
150 request_id: uuid::Uuid::new_v4().to_string(),
151 metadata: vec![],
152 timeout_ms,
153 };
154 #[cfg(feature = "opentelemetry")]
155 inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
156
157 let gate = self.select_gate(target)?;
158 let target_id = self.extract_target_id(target);
159 gate.send_request_with_type(target_id, payload_type, envelope)
160 .await
161 }
162
163 #[cfg_attr(
165 feature = "opentelemetry",
166 tracing::instrument(
167 skip_all,
168 name = "RuntimeContext.tell_raw",
169 fields(
170 actr_id = %self.self_id,
171 route_key = %route_key,
172 )
173 )
174 )]
175 pub async fn tell_raw(
176 &self,
177 target: &Dest,
178 route_key: String,
179 payload_type: PayloadType,
180 payload: Bytes,
181 ) -> ActorResult<()> {
182 #[cfg(feature = "opentelemetry")]
183 use crate::wire::webrtc::trace::inject_span_context_to_rpc;
184
185 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
186 let mut envelope = RpcEnvelope {
187 route_key,
188 payload: Some(payload),
189 error: None,
190 traceparent: None,
191 tracestate: None,
192 request_id: uuid::Uuid::new_v4().to_string(),
193 metadata: vec![],
194 timeout_ms: 0,
195 };
196 #[cfg(feature = "opentelemetry")]
197 inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
198
199 let gate = self.select_gate(target)?;
200 let target_id = self.extract_target_id(target);
201 gate.send_message_with_type(target_id, payload_type, envelope)
202 .await
203 }
204
205 pub async fn send_data_stream_with_type(
210 &self,
211 target: &Dest,
212 payload_type: actr_protocol::PayloadType,
213 chunk: DataStream,
214 ) -> ActorResult<()> {
215 use actr_protocol::prost::Message as ProstMessage;
216
217 let payload = chunk.encode_to_vec();
218 let stream_id = chunk.stream_id.as_str();
219
220 let gate = self.select_gate(target)?;
221 let target_id = self.extract_target_id(target);
222
223 gate.send_data_stream(
224 target_id,
225 payload_type,
226 stream_id,
227 bytes::Bytes::from(payload),
228 )
229 .await
230 }
231
232 fn get_dependency_fingerprint(&self, target_type: &ActrType) -> Option<String> {
234 let actr_lock = self.actr_lock.as_ref()?;
235
236 let key = target_type.to_string_repr();
237
238 if let Some(dep) = actr_lock.get_dependency(&key) {
240 return Some(dep.fingerprint.clone());
241 }
242
243 for dep in &actr_lock.dependencies {
245 if Self::matches_dependency_actr_type(&dep.actr_type, target_type) {
246 return Some(dep.fingerprint.clone());
247 }
248 }
249
250 None
251 }
252
253 fn matches_dependency_actr_type(raw: &str, target_type: &ActrType) -> bool {
254 let Ok(dep_type) = ActrType::from_string_repr(raw) else {
255 return false;
256 };
257
258 dep_type == *target_type
259 }
260
261 async fn send_discovery_request(
263 &self,
264 target_type: &ActrType,
265 candidate_count: u32,
266 client_fingerprint: String,
267 ) -> ActorResult<InternalDiscoveryResult> {
268 let criteria = route_candidates_request::NodeSelectionCriteria {
269 candidate_count,
270 ranking_factors: Vec::new(),
271 minimal_dependency_requirement: None,
272 minimal_health_requirement: None,
273 };
274
275 let request = RouteCandidatesRequest {
276 target_type: target_type.clone(),
277 criteria: Some(criteria),
278 client_location: None,
279 client_fingerprint,
280 };
281
282 let response = self
283 .signaling_client
284 .send_route_candidates_request(self.self_id.clone(), self.credential.clone(), request)
285 .await
286 .map_err(|e| ActrError::Unavailable(format!("Route candidates request failed: {e}")))?;
287
288 match response.result {
289 Some(actr_protocol::route_candidates_response::Result::Success(success)) => {
290 Ok(InternalDiscoveryResult {
291 candidates: success.candidates,
292 })
293 }
294 Some(actr_protocol::route_candidates_response::Result::Error(err)) => {
295 Err(ActrError::Unavailable(format!(
296 "Route candidates error {}: {}",
297 err.code, err.message
298 )))
299 }
300 None => Err(ActrError::Unavailable(
301 "Invalid route candidates response: missing result".to_string(),
302 )),
303 }
304 }
305}
306
307struct InternalDiscoveryResult {
309 candidates: Vec<ActrId>,
310}
311
312#[derive(Clone)]
329pub(crate) struct BootstrapContextBuilder {
330 inproc_gate: Gate,
331 outproc_gate: Option<Gate>,
332 data_stream_registry: Arc<DataStreamRegistry>,
333 media_frame_registry: Arc<MediaFrameRegistry>,
334 signaling_client: Arc<dyn SignalingClient>,
335 actr_lock: Option<Arc<LockFile>>,
336}
337
338impl BootstrapContextBuilder {
339 #[allow(clippy::too_many_arguments)]
344 pub(crate) fn new(
345 inproc_gate: Gate,
346 outproc_gate: Option<Gate>,
347 data_stream_registry: Arc<DataStreamRegistry>,
348 media_frame_registry: Arc<MediaFrameRegistry>,
349 signaling_client: Arc<dyn SignalingClient>,
350 actr_lock: Option<Arc<LockFile>>,
351 ) -> Self {
352 Self {
353 inproc_gate,
354 outproc_gate,
355 data_stream_registry,
356 media_frame_registry,
357 signaling_client,
358 actr_lock,
359 }
360 }
361
362 pub(crate) fn build_bootstrap(
369 &self,
370 self_id: &ActrId,
371 credential: &AIdCredential,
372 ) -> RuntimeContext {
373 RuntimeContext::new(
374 self_id.clone(),
375 None,
376 uuid::Uuid::new_v4().to_string(),
377 self.inproc_gate.clone(),
378 self.outproc_gate.clone(),
379 self.data_stream_registry.clone(),
380 self.media_frame_registry.clone(),
381 self.signaling_client.clone(),
382 credential.clone(),
383 self.actr_lock.clone(),
384 )
385 }
386}
387
388#[async_trait]
389impl Context for RuntimeContext {
390 fn self_id(&self) -> &ActrId {
393 &self.self_id
394 }
395
396 fn caller_id(&self) -> Option<&ActrId> {
397 self.caller_id.as_ref()
398 }
399
400 fn request_id(&self) -> &str {
401 &self.request_id
402 }
403
404 #[cfg_attr(
406 feature = "opentelemetry",
407 tracing::instrument(
408 skip_all,
409 name = "RuntimeContext.call",
410 fields(actr_id = %self.self_id)
411 )
412 )]
413 async fn call<R: RpcRequest>(&self, target: &Dest, request: R) -> ActorResult<R::Response> {
414 use actr_protocol::prost::Message as ProstMessage;
415
416 let payload: Bytes = request.encode_to_vec().into();
418
419 let route_key = R::route_key().to_string();
421
422 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
424 let mut envelope = RpcEnvelope {
425 route_key,
426 payload: Some(payload),
427 error: None,
428 traceparent: None,
429 tracestate: None,
430 request_id: uuid::Uuid::new_v4().to_string(), metadata: vec![],
432 timeout_ms: 30000, };
434 #[cfg(feature = "opentelemetry")]
436 inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
437
438 let gate = self.select_gate(target)?;
440 let target_id = self.extract_target_id(target);
441
442 let response_bytes = gate
445 .send_request_with_type(target_id, R::payload_type(), envelope)
446 .await?;
447
448 R::Response::decode(&*response_bytes).map_err(|e| {
450 ActrError::DecodeFailure(format!(
451 "Failed to decode {}: {}",
452 std::any::type_name::<R::Response>(),
453 e
454 ))
455 })
456 }
457
458 #[cfg_attr(
459 feature = "opentelemetry",
460 tracing::instrument(
461 skip_all,
462 name = "RuntimeContext.tell",
463 fields(actr_id = %self.self_id)
464 )
465 )]
466 async fn tell<R: RpcRequest>(&self, target: &Dest, message: R) -> ActorResult<()> {
467 let payload: Bytes = message.encode_to_vec().into();
469
470 let route_key = R::route_key().to_string();
472
473 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
475 let mut envelope = RpcEnvelope {
476 route_key,
477 payload: Some(payload),
478 error: None,
479 traceparent: None,
480 tracestate: None,
481 request_id: uuid::Uuid::new_v4().to_string(),
482 metadata: vec![],
483 timeout_ms: 0, };
485 #[cfg(feature = "opentelemetry")]
487 inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
488
489 let gate = self.select_gate(target)?;
491 let target_id = self.extract_target_id(target);
492
493 gate.send_message_with_type(target_id, R::payload_type(), envelope)
495 .await
496 }
497
498 async fn register_stream<F>(&self, stream_id: String, callback: F) -> ActorResult<()>
501 where
502 F: Fn(DataStream, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static,
503 {
504 tracing::debug!(
505 "📊 Registering DataStream callback for stream_id: {}",
506 stream_id
507 );
508 self.data_stream_registry
509 .register(stream_id, Arc::new(callback));
510 Ok(())
511 }
512
513 async fn unregister_stream(&self, stream_id: &str) -> ActorResult<()> {
514 tracing::debug!(
515 "🚫 Unregistering DataStream callback for stream_id: {}",
516 stream_id
517 );
518 self.data_stream_registry.unregister(stream_id);
519 Ok(())
520 }
521
522 async fn send_data_stream(
523 &self,
524 target: &Dest,
525 chunk: DataStream,
526 payload_type: actr_protocol::PayloadType,
527 ) -> ActorResult<()> {
528 use actr_protocol::prost::Message as ProstMessage;
529
530 let payload = chunk.encode_to_vec();
532 let stream_id = chunk.stream_id.as_str();
533
534 tracing::debug!(
535 "📤 Sending DataStream: stream_id={}, sequence={}, size={} bytes",
536 stream_id,
537 chunk.sequence,
538 payload.len()
539 );
540
541 let gate = self.select_gate(target)?;
543 let target_id = self.extract_target_id(target);
544
545 gate.send_data_stream(
547 target_id,
548 payload_type,
549 stream_id,
550 bytes::Bytes::from(payload),
551 )
552 .await
553 }
554
555 #[cfg_attr(
556 feature = "opentelemetry",
557 tracing::instrument(
558 skip_all,
559 name = "RuntimeContext.discover_route_candidate",
560 fields(
561 actr_id = %self.self_id,
562 target_type = %target_type,
563 )
564 )
565 )]
566 async fn discover_route_candidate(&self, target_type: &ActrType) -> ActorResult<ActrId> {
567 if !self.signaling_client.is_connected() {
568 return Err(ActrError::Unavailable(
569 "Signaling client is not connected.".to_string(),
570 ));
571 }
572
573 let service_name = format!("{}:{}", target_type.manufacturer, target_type.name);
574
575 let client_fingerprint = match self.get_dependency_fingerprint(target_type) {
579 Some(fingerprint) => fingerprint,
580 None => {
581 if self.actr_lock.is_none() {
582 tracing::debug!(
583 "manifest.lock.toml not loaded; sending discovery without fingerprint for '{}'",
584 service_name
585 );
586 String::new()
587 } else {
588 tracing::error!(
589 severity = 10,
590 error_category = "dependency_missing",
591 "❌ DEPENDENCY NOT FOUND: Service '{}' is not declared in manifest.lock.toml.\n\
592 Please run 'actr deps install' to generate the lock file with all dependencies.",
593 service_name
594 );
595 return Err(ActrError::DependencyNotFound {
596 service_name: service_name.clone(),
597 message: format!(
598 "Dependency '{}' not found in manifest.lock.toml. Run 'actr deps install' to resolve dependencies.",
599 service_name
600 ),
601 });
602 }
603 }
604 };
605
606 if !client_fingerprint.is_empty() {
607 tracing::debug!(
608 "📋 Found dependency fingerprint for '{}': {}",
609 service_name,
610 &client_fingerprint[..20.min(client_fingerprint.len())]
611 );
612 }
613
614 let result = self
618 .send_discovery_request(target_type, 1, client_fingerprint)
619 .await?;
620
621 tracing::info!(
622 "Discovery result [{}]: {} candidates",
623 service_name,
624 result.candidates.len(),
625 );
626
627 result.candidates.into_iter().next().ok_or_else(|| {
628 ActrError::NotFound(format!(
629 "No route candidates for type {}/{}",
630 target_type.manufacturer, target_type.name
631 ))
632 })
633 }
634
635 async fn call_raw(
636 &self,
637 target: &ActrId,
638 route_key: &str,
639 payload: Bytes,
640 ) -> ActorResult<Bytes> {
641 RuntimeContext::call_raw(
646 self,
647 &Dest::Actor(target.clone()),
648 route_key.to_string(),
649 PayloadType::RpcReliable,
650 payload,
651 30_000,
652 )
653 .await
654 }
655
656 async fn register_media_track<F>(&self, track_id: String, callback: F) -> ActorResult<()>
659 where
660 F: Fn(MediaSample, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static,
661 {
662 tracing::debug!(
663 "📹 Registering MediaTrack callback for track_id: {}",
664 track_id
665 );
666 self.media_frame_registry
667 .register(track_id, Arc::new(callback));
668 Ok(())
669 }
670
671 async fn unregister_media_track(&self, track_id: &str) -> ActorResult<()> {
672 tracing::debug!(
673 "📹 Unregistering MediaTrack callback for track_id: {}",
674 track_id
675 );
676 self.media_frame_registry.unregister(track_id);
677 Ok(())
678 }
679
680 async fn send_media_sample(
681 &self,
682 target: &Dest,
683 track_id: &str,
684 sample: MediaSample,
685 ) -> ActorResult<()> {
686 let gate = self.select_gate(target)?;
688
689 let target_id = self.extract_target_id(target);
691
692 gate.send_media_sample(target_id, track_id, sample).await
694 }
695
696 async fn add_media_track(
697 &self,
698 target: &Dest,
699 track_id: &str,
700 codec: &str,
701 media_type: &str,
702 ) -> ActorResult<()> {
703 let gate = self.select_gate(target)?;
704 let target_id = self.extract_target_id(target);
705 gate.add_media_track(target_id, track_id, codec, media_type)
706 .await
707 }
708
709 async fn remove_media_track(&self, target: &Dest, track_id: &str) -> ActorResult<()> {
710 let gate = self.select_gate(target)?;
711 let target_id = self.extract_target_id(target);
712 gate.remove_media_track(target_id, track_id).await
713 }
714}