Skip to main content

actr_hyper/
context.rs

1//! Runtime Context Implementation
2//!
3//! Implements the Context trait defined in actr-framework.
4
5use crate::inbound::{DataStreamRegistry, MediaFrameRegistry};
6use crate::outbound::Gate;
7use crate::wire::webrtc::SignalingClient;
8#[cfg(feature = "opentelemetry")]
9use crate::wire::webrtc::trace::inject_span_context_to_rpc;
10use actr_config::lock::LockFile;
11use actr_framework::{Bytes, Context, DataStream, Dest, MediaSample};
12use actr_protocol::{
13    AIdCredential, ActorResult, ActrError, ActrId, ActrType, PayloadType, RouteCandidatesRequest,
14    RpcEnvelope, RpcRequest, route_candidates_request,
15};
16use async_trait::async_trait;
17use futures_util::future::BoxFuture;
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::RwLock;
21
22/// RuntimeContext - Runtime's implementation of Context trait
23///
24/// # Design Features
25///
26/// - **Zero vtable**: internally uses Gate enum dispatch (not dyn)
27/// - **Smart routing**: automatically selects Host or Peer based on Dest
28/// - **Full implementation**: contains complete call/tell logic (encode, send, decode)
29/// - **Type safety**: generic methods provide compile-time type checking
30///
31/// # Performance
32///
33/// - Gate is an enum, uses static dispatch
34/// - Compiler can fully inline the entire call chain
35/// - Zero virtual function call overhead
36#[derive(Clone)]
37pub struct RuntimeContext {
38    self_id: ActrId,
39    caller_id: Option<ActrId>,
40    request_id: String,
41    inproc_gate: Gate,          // Shell/Local calls - immediately available
42    outproc_gate: Option<Gate>, // Remote Actor calls - lazily initialized
43    data_stream_registry: Arc<DataStreamRegistry>, // DataStream callback registry
44    media_frame_registry: Arc<MediaFrameRegistry>, // MediaTrack callback registry
45    signaling_client: Arc<dyn SignalingClient>,
46    credential: AIdCredential,
47    actr_lock: Option<Arc<LockFile>>, // packaged manifest.lock.toml for fingerprint lookups
48    /// Shared map of discovered direct-connect WebSocket URLs, keyed by ActrId.
49    /// Populated by `discover_route_candidate` from the signaling `ws_address_map`,
50    /// then read by `DefaultWireBuilder` when establishing outbound connections.
51    discovered_ws_addresses: Arc<RwLock<HashMap<ActrId, String>>>,
52}
53
54impl RuntimeContext {
55    /// Create a new `RuntimeContext`.
56    ///
57    /// # Parameters
58    ///
59    /// - `self_id`: ID of the current actor
60    /// - `caller_id`: optional caller actor ID
61    /// - `request_id`: unique ID for the current request
62    /// - `inproc_gate`: in-process gate, immediately available
63    /// - `outproc_gate`: cross-process gate, possibly `None` until WebRTC initialization completes
64    /// - `data_stream_registry`: callback registry for `DataStream`
65    /// - `media_frame_registry`: callback registry for `MediaTrack`
66    /// - `signaling_client`: signaling client used for route discovery
67    /// - `credential`: credentials used when calling signaling interfaces
68    /// - `actr_lock`: shared packaged `manifest.lock.toml` used for fingerprint lookup (wrapped in `Arc` so context clones stay cheap)
69    /// - `discovered_ws_addresses`: shared map written by `discover_route_candidate` and read by `DefaultWireBuilder`
70    #[allow(clippy::too_many_arguments)] // Internal API - all parameters are required
71    pub(crate) fn new(
72        self_id: ActrId,
73        caller_id: Option<ActrId>,
74        request_id: String,
75        inproc_gate: Gate,
76        outproc_gate: Option<Gate>,
77        data_stream_registry: Arc<DataStreamRegistry>,
78        media_frame_registry: Arc<MediaFrameRegistry>,
79        signaling_client: Arc<dyn SignalingClient>,
80        credential: AIdCredential,
81        actr_lock: Option<Arc<LockFile>>,
82        discovered_ws_addresses: Arc<RwLock<HashMap<ActrId, String>>>,
83    ) -> Self {
84        Self {
85            self_id,
86            caller_id,
87            request_id,
88            inproc_gate,
89            outproc_gate,
90            data_stream_registry,
91            media_frame_registry,
92            signaling_client,
93            credential,
94            actr_lock,
95            discovered_ws_addresses,
96        }
97    }
98
99    /// Select the appropriate gate based on `Dest`.
100    ///
101    /// - `Dest::Shell` -> `inproc_gate`
102    /// - `Dest::Local` -> `inproc_gate`
103    /// - `Dest::Actor(_)` -> `outproc_gate`, which must already be initialized
104    #[inline]
105    fn select_gate(&self, dest: &Dest) -> ActorResult<&Gate> {
106        match dest {
107            Dest::Shell | Dest::Local => Ok(&self.inproc_gate),
108            Dest::Actor(_) => self.outproc_gate.as_ref().ok_or_else(|| {
109                ActrError::Internal(
110                    "PeerGate not initialized yet (WebRTC setup in progress)".to_string(),
111                )
112            }),
113        }
114    }
115
116    /// Extract the target `ActrId` from `Dest`.
117    ///
118    /// - `Dest::Shell` -> `self_id` for reverse Workload-to-App calls
119    /// - `Dest::Local` -> `self_id` for local workload calls
120    /// - `Dest::Actor(id)` -> remote actor ID
121    #[inline]
122    fn extract_target_id<'a>(&'a self, dest: &'a Dest) -> &'a ActrId {
123        match dest {
124            Dest::Shell | Dest::Local => &self.self_id,
125            Dest::Actor(id) => id,
126        }
127    }
128
129    /// Execute a non-generic RPC request call (useful for language bindings).
130    #[cfg_attr(
131        feature = "opentelemetry",
132        tracing::instrument(
133            skip_all,
134            name = "RuntimeContext.call_raw",
135            fields(
136                actr_id = %self.self_id,
137                route_key = %route_key,
138            )
139        )
140    )]
141    pub async fn call_raw(
142        &self,
143        target: &Dest,
144        route_key: String,
145        payload_type: PayloadType,
146        payload: Bytes,
147        timeout_ms: i64,
148    ) -> ActorResult<Bytes> {
149        #[cfg(feature = "opentelemetry")]
150        use crate::wire::webrtc::trace::inject_span_context_to_rpc;
151
152        #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
153        let mut envelope = RpcEnvelope {
154            route_key,
155            payload: Some(payload),
156            error: None,
157            traceparent: None,
158            tracestate: None,
159            request_id: uuid::Uuid::new_v4().to_string(),
160            metadata: vec![],
161            timeout_ms,
162        };
163        #[cfg(feature = "opentelemetry")]
164        inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
165
166        let gate = self.select_gate(target)?;
167        let target_id = self.extract_target_id(target);
168        gate.send_request_with_type(target_id, payload_type, envelope)
169            .await
170    }
171
172    /// Execute a non-generic RPC message call (fire-and-forget, useful for language bindings).
173    #[cfg_attr(
174        feature = "opentelemetry",
175        tracing::instrument(
176            skip_all,
177            name = "RuntimeContext.tell_raw",
178            fields(
179                actr_id = %self.self_id,
180                route_key = %route_key,
181            )
182        )
183    )]
184    pub async fn tell_raw(
185        &self,
186        target: &Dest,
187        route_key: String,
188        payload_type: PayloadType,
189        payload: Bytes,
190    ) -> ActorResult<()> {
191        #[cfg(feature = "opentelemetry")]
192        use crate::wire::webrtc::trace::inject_span_context_to_rpc;
193
194        #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
195        let mut envelope = RpcEnvelope {
196            route_key,
197            payload: Some(payload),
198            error: None,
199            traceparent: None,
200            tracestate: None,
201            request_id: uuid::Uuid::new_v4().to_string(),
202            metadata: vec![],
203            timeout_ms: 0,
204        };
205        #[cfg(feature = "opentelemetry")]
206        inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
207
208        let gate = self.select_gate(target)?;
209        let target_id = self.extract_target_id(target);
210        gate.send_message_with_type(target_id, payload_type, envelope)
211            .await
212    }
213
214    /// Send DataStream with an explicit payload type (lane selection).
215    ///
216    /// Convenience wrapper for language bindings that prefer positional `payload_type`
217    /// before `chunk`. Equivalent to calling `Context::send_data_stream` directly.
218    pub async fn send_data_stream_with_type(
219        &self,
220        target: &Dest,
221        payload_type: actr_protocol::PayloadType,
222        chunk: DataStream,
223    ) -> ActorResult<()> {
224        use actr_protocol::prost::Message as ProstMessage;
225
226        let payload = chunk.encode_to_vec();
227        let stream_id = chunk.stream_id.as_str();
228
229        let gate = self.select_gate(target)?;
230        let target_id = self.extract_target_id(target);
231
232        gate.send_data_stream(
233            target_id,
234            payload_type,
235            stream_id,
236            bytes::Bytes::from(payload),
237        )
238        .await
239    }
240
241    /// Get dependency fingerprint from the packaged manifest.lock.toml
242    fn get_dependency_fingerprint(&self, target_type: &ActrType) -> Option<String> {
243        let actr_lock = self.actr_lock.as_ref()?;
244
245        let key = target_type.to_string_repr();
246
247        // Try by full key
248        if let Some(dep) = actr_lock.get_dependency(&key) {
249            return Some(dep.fingerprint.clone());
250        }
251
252        // Fallback to scanning dependencies when the exact key is not present.
253        for dep in &actr_lock.dependencies {
254            if Self::matches_dependency_actr_type(&dep.actr_type, target_type) {
255                return Some(dep.fingerprint.clone());
256            }
257        }
258
259        None
260    }
261
262    fn matches_dependency_actr_type(raw: &str, target_type: &ActrType) -> bool {
263        let Ok(dep_type) = ActrType::from_string_repr(raw) else {
264            return false;
265        };
266
267        dep_type == *target_type
268    }
269
270    /// Internal: Send discovery request to signaling server
271    async fn send_discovery_request(
272        &self,
273        target_type: &ActrType,
274        candidate_count: u32,
275        client_fingerprint: String,
276    ) -> ActorResult<InternalDiscoveryResult> {
277        let criteria = route_candidates_request::NodeSelectionCriteria {
278            candidate_count,
279            ranking_factors: Vec::new(),
280            minimal_dependency_requirement: None,
281            minimal_health_requirement: None,
282        };
283
284        let request = RouteCandidatesRequest {
285            target_type: target_type.clone(),
286            criteria: Some(criteria),
287            client_location: None,
288            client_fingerprint,
289        };
290
291        let response = self
292            .signaling_client
293            .send_route_candidates_request(self.self_id.clone(), self.credential.clone(), request)
294            .await
295            .map_err(|e| ActrError::Unavailable(format!("Route candidates request failed: {e}")))?;
296
297        match response.result {
298            Some(actr_protocol::route_candidates_response::Result::Success(success)) => {
299                Ok(InternalDiscoveryResult {
300                    candidates: success.candidates,
301                    ws_address_map: success.ws_address_map,
302                })
303            }
304            Some(actr_protocol::route_candidates_response::Result::Error(err)) => {
305                Err(ActrError::Unavailable(format!(
306                    "Route candidates error {}: {}",
307                    err.code, err.message
308                )))
309            }
310            None => Err(ActrError::Unavailable(
311                "Invalid route candidates response: missing result".to_string(),
312            )),
313        }
314    }
315}
316
317/// Internal discovery result structure
318struct InternalDiscoveryResult {
319    candidates: Vec<ActrId>,
320    /// Direct-connect WebSocket URLs returned alongside candidates.
321    ws_address_map: Vec<actr_protocol::WsAddressEntry>,
322}
323
324/// Template used to materialize `RuntimeContext` instances for lifecycle
325/// bootstrap / observation paths (on_start / on_stop, signaling hooks, WebRTC
326/// hooks, ActrRef::app_context, ...).
327///
328/// Unlike the per-request dispatch path, which constructs `RuntimeContext`
329/// directly from `Inner`, this builder is a **detachable snapshot** of the
330/// handles needed to build a context. It is cloned into long-lived hook
331/// closures and into `ActrRefShared` so those paths don't need to retain a
332/// reference back to `Inner`.
333///
334/// # Fields
335///
336/// Mirrors `RuntimeContext`'s non-per-request state. `outproc_gate` is
337/// `Option` because hook builders can be captured *before* WebRTC
338/// initialization finishes; such snapshots will simply emit contexts with
339/// `outproc_gate = None`, matching the pre-existing semantics.
340#[derive(Clone)]
341pub(crate) struct BootstrapContextBuilder {
342    inproc_gate: Gate,
343    outproc_gate: Option<Gate>,
344    data_stream_registry: Arc<DataStreamRegistry>,
345    media_frame_registry: Arc<MediaFrameRegistry>,
346    signaling_client: Arc<dyn SignalingClient>,
347    actr_lock: Option<Arc<LockFile>>,
348    /// Shared map populated by discover_route_candidate; forwarded into each
349    /// RuntimeContext so discovery results are visible to DefaultWireBuilder.
350    discovered_ws_addresses: Arc<RwLock<HashMap<ActrId, String>>>,
351}
352
353impl BootstrapContextBuilder {
354    /// Assemble a new builder from the runtime handles. All parameters are
355    /// snapshotted by clone; later mutations on the origin (e.g. the node's
356    /// own `actr_lock`) are intentionally not observed — callers that need
357    /// a fresh snapshot must re-build.
358    #[allow(clippy::too_many_arguments)]
359    pub(crate) fn new(
360        inproc_gate: Gate,
361        outproc_gate: Option<Gate>,
362        data_stream_registry: Arc<DataStreamRegistry>,
363        media_frame_registry: Arc<MediaFrameRegistry>,
364        signaling_client: Arc<dyn SignalingClient>,
365        actr_lock: Option<Arc<LockFile>>,
366        discovered_ws_addresses: Arc<RwLock<HashMap<ActrId, String>>>,
367    ) -> Self {
368        Self {
369            inproc_gate,
370            outproc_gate,
371            data_stream_registry,
372            media_frame_registry,
373            signaling_client,
374            actr_lock,
375            discovered_ws_addresses,
376        }
377    }
378
379    /// Materialize a bootstrap `RuntimeContext` for lifecycle hooks.
380    ///
381    /// The produced context has no caller (`caller_id = None`) and a freshly
382    /// generated `request_id`; it is intended for on_start / on_stop /
383    /// transport-event observation where no inbound envelope drives the
384    /// request identity.
385    pub(crate) fn build_bootstrap(
386        &self,
387        self_id: &ActrId,
388        credential: &AIdCredential,
389    ) -> RuntimeContext {
390        RuntimeContext::new(
391            self_id.clone(),
392            None,
393            uuid::Uuid::new_v4().to_string(),
394            self.inproc_gate.clone(),
395            self.outproc_gate.clone(),
396            self.data_stream_registry.clone(),
397            self.media_frame_registry.clone(),
398            self.signaling_client.clone(),
399            credential.clone(),
400            self.actr_lock.clone(),
401            self.discovered_ws_addresses.clone(),
402        )
403    }
404}
405
406#[async_trait]
407impl Context for RuntimeContext {
408    // ========== Data Access Methods ==========
409
410    fn self_id(&self) -> &ActrId {
411        &self.self_id
412    }
413
414    fn caller_id(&self) -> Option<&ActrId> {
415        self.caller_id.as_ref()
416    }
417
418    fn request_id(&self) -> &str {
419        &self.request_id
420    }
421
422    // ========== Communication Methods ==========
423    #[cfg_attr(
424        feature = "opentelemetry",
425        tracing::instrument(
426            skip_all,
427            name = "RuntimeContext.call",
428            fields(actr_id = %self.self_id)
429        )
430    )]
431    async fn call<R: RpcRequest>(&self, target: &Dest, request: R) -> ActorResult<R::Response> {
432        use actr_protocol::prost::Message as ProstMessage;
433
434        // 1. Encode the request as protobuf bytes.
435        let payload: Bytes = request.encode_to_vec().into();
436
437        // 2. Get the compile-time route key from the RpcRequest trait.
438        let route_key = R::route_key().to_string();
439
440        // 3. Build the RpcEnvelope with W3C tracing fields.
441        #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
442        let mut envelope = RpcEnvelope {
443            route_key,
444            payload: Some(payload),
445            error: None,
446            traceparent: None,
447            tracestate: None,
448            request_id: uuid::Uuid::new_v4().to_string(), // Generate a new request_id.
449            metadata: vec![],
450            timeout_ms: 30000, // Default to a 30-second timeout.
451        };
452        // Inject tracing context from current span
453        #[cfg(feature = "opentelemetry")]
454        inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
455
456        // 4. Select a gate from Dest and extract the target ActrId.
457        let gate = self.select_gate(target)?;
458        let target_id = self.extract_target_id(target);
459
460        // 5. Send via Gate enum dispatch without virtual calls.
461        // Respect request's declared payload type (lane selection)
462        let response_bytes = gate
463            .send_request_with_type(target_id, R::payload_type(), envelope)
464            .await?;
465
466        // 6. Decode the typed response.
467        R::Response::decode(&*response_bytes).map_err(|e| {
468            ActrError::DecodeFailure(format!(
469                "Failed to decode {}: {}",
470                std::any::type_name::<R::Response>(),
471                e
472            ))
473        })
474    }
475
476    #[cfg_attr(
477        feature = "opentelemetry",
478        tracing::instrument(
479            skip_all,
480            name = "RuntimeContext.tell",
481            fields(actr_id = %self.self_id)
482        )
483    )]
484    async fn tell<R: RpcRequest>(&self, target: &Dest, message: R) -> ActorResult<()> {
485        // 1. Encode the message.
486        let payload: Bytes = message.encode_to_vec().into();
487
488        // 2. Get the route key.
489        let route_key = R::route_key().to_string();
490
491        // 3. Build the RpcEnvelope for fire-and-forget delivery.
492        #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
493        let mut envelope = RpcEnvelope {
494            route_key,
495            payload: Some(payload),
496            error: None,
497            traceparent: None,
498            tracestate: None,
499            request_id: uuid::Uuid::new_v4().to_string(),
500            metadata: vec![],
501            timeout_ms: 0, // Zero means no response is expected.
502        };
503        // Inject tracing context from current span
504        #[cfg(feature = "opentelemetry")]
505        inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
506
507        // 4. Select a gate from Dest and extract the target ActrId.
508        let gate = self.select_gate(target)?;
509        let target_id = self.extract_target_id(target);
510
511        // 5. Dispatch through the Gate enum while preserving payload type.
512        gate.send_message_with_type(target_id, R::payload_type(), envelope)
513            .await
514    }
515
516    // ========== Fast Path: DataStream Methods ==========
517
518    async fn register_stream<F>(&self, stream_id: String, callback: F) -> ActorResult<()>
519    where
520        F: Fn(DataStream, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static,
521    {
522        tracing::debug!(
523            "📊 Registering DataStream callback for stream_id: {}",
524            stream_id
525        );
526        self.data_stream_registry
527            .register(stream_id, Arc::new(callback));
528        Ok(())
529    }
530
531    async fn unregister_stream(&self, stream_id: &str) -> ActorResult<()> {
532        tracing::debug!(
533            "🚫 Unregistering DataStream callback for stream_id: {}",
534            stream_id
535        );
536        self.data_stream_registry.unregister(stream_id);
537        Ok(())
538    }
539
540    async fn send_data_stream(
541        &self,
542        target: &Dest,
543        chunk: DataStream,
544        payload_type: actr_protocol::PayloadType,
545    ) -> ActorResult<()> {
546        use actr_protocol::prost::Message as ProstMessage;
547
548        // 1. Serialize DataStream to bytes
549        let payload = chunk.encode_to_vec();
550        let stream_id = chunk.stream_id.as_str();
551
552        tracing::debug!(
553            "📤 Sending DataStream: stream_id={}, sequence={}, size={} bytes",
554            stream_id,
555            chunk.sequence,
556            payload.len()
557        );
558
559        // 2. Select gate based on Dest
560        let gate = self.select_gate(target)?;
561        let target_id = self.extract_target_id(target);
562
563        // 3. Send via Gate with the caller-specified PayloadType
564        gate.send_data_stream(
565            target_id,
566            payload_type,
567            stream_id,
568            bytes::Bytes::from(payload),
569        )
570        .await
571    }
572
573    #[cfg_attr(
574        feature = "opentelemetry",
575        tracing::instrument(
576            skip_all,
577            name = "RuntimeContext.discover_route_candidate",
578            fields(
579                actr_id = %self.self_id,
580                target_type = %target_type,
581            )
582        )
583    )]
584    async fn discover_route_candidate(&self, target_type: &ActrType) -> ActorResult<ActrId> {
585        if !self.signaling_client.is_connected() {
586            return Err(ActrError::Unavailable(
587                "Signaling client is not connected.".to_string(),
588            ));
589        }
590
591        let service_name = format!("{}:{}", target_type.manufacturer, target_type.name);
592
593        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
594        // Step 1: Get fingerprint from manifest.lock.toml (when available)
595        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
596        let client_fingerprint = match self.get_dependency_fingerprint(target_type) {
597            Some(fingerprint) => fingerprint,
598            None => {
599                if self.actr_lock.is_none() {
600                    tracing::debug!(
601                        "manifest.lock.toml not loaded; sending discovery without fingerprint for '{}'",
602                        service_name
603                    );
604                    String::new()
605                } else {
606                    tracing::error!(
607                        severity = 10,
608                        error_category = "dependency_missing",
609                        "❌ DEPENDENCY NOT FOUND: Service '{}' is not declared in manifest.lock.toml.\n\
610                         Please run 'actr deps install' to generate the lock file with all dependencies.",
611                        service_name
612                    );
613                    return Err(ActrError::DependencyNotFound {
614                        service_name: service_name.clone(),
615                        message: format!(
616                            "Dependency '{}' not found in manifest.lock.toml. Run 'actr deps install' to resolve dependencies.",
617                            service_name
618                        ),
619                    });
620                }
621            }
622        };
623
624        if !client_fingerprint.is_empty() {
625            tracing::debug!(
626                "📋 Found dependency fingerprint for '{}': {}",
627                service_name,
628                &client_fingerprint[..20.min(client_fingerprint.len())]
629            );
630        }
631
632        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
633        // Step 2: Send discovery request to signaling server
634        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
635        let result = self
636            .send_discovery_request(target_type, 1, client_fingerprint)
637            .await?;
638
639        tracing::info!(
640            "Discovery result [{}]: {} candidates, {} ws_address entries",
641            service_name,
642            result.candidates.len(),
643            result.ws_address_map.len(),
644        );
645
646        // Populate the shared discovered_ws_addresses map so DefaultWireBuilder
647        // can use direct WebSocket connections instead of WebRTC for peers that
648        // advertise a ws:// address through the signaling server.
649        if !result.ws_address_map.is_empty() {
650            let mut map = self.discovered_ws_addresses.write().await;
651            for entry in result.ws_address_map {
652                if let Some(url) = entry.ws_address {
653                    tracing::debug!(
654                        actor_id = ?entry.candidate_id,
655                        ws_url = %url,
656                        "discovered direct WebSocket address",
657                    );
658                    map.insert(entry.candidate_id, url);
659                }
660            }
661        }
662
663        result.candidates.into_iter().next().ok_or_else(|| {
664            ActrError::NotFound(format!(
665                "No route candidates for type {}/{}",
666                target_type.manufacturer, target_type.name
667            ))
668        })
669    }
670
671    async fn call_raw(
672        &self,
673        target: &ActrId,
674        route_key: &str,
675        payload: Bytes,
676    ) -> ActorResult<Bytes> {
677        // Guest-facing trait entry: remote raw RPC with reliable lane and a
678        // 30 s default timeout. Delegate to the inherent `call_raw` so both
679        // entry points share one RpcEnvelope construction / gate-selection
680        // path and stay in sync.
681        RuntimeContext::call_raw(
682            self,
683            &Dest::Actor(target.clone()),
684            route_key.to_string(),
685            PayloadType::RpcReliable,
686            payload,
687            30_000,
688        )
689        .await
690    }
691
692    // ========== Fast Path: MediaTrack Methods ==========
693
694    async fn register_media_track<F>(&self, track_id: String, callback: F) -> ActorResult<()>
695    where
696        F: Fn(MediaSample, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static,
697    {
698        tracing::debug!(
699            "📹 Registering MediaTrack callback for track_id: {}",
700            track_id
701        );
702        self.media_frame_registry
703            .register(track_id, Arc::new(callback));
704        Ok(())
705    }
706
707    async fn unregister_media_track(&self, track_id: &str) -> ActorResult<()> {
708        tracing::debug!(
709            "📹 Unregistering MediaTrack callback for track_id: {}",
710            track_id
711        );
712        self.media_frame_registry.unregister(track_id);
713        Ok(())
714    }
715
716    async fn send_media_sample(
717        &self,
718        target: &Dest,
719        track_id: &str,
720        sample: MediaSample,
721    ) -> ActorResult<()> {
722        // 1. Select appropriate gate based on Dest
723        let gate = self.select_gate(target)?;
724
725        // 2. Extract target ActrId
726        let target_id = self.extract_target_id(target);
727
728        // 3. Send via Gate (delegates to WebRTC Track)
729        gate.send_media_sample(target_id, track_id, sample).await
730    }
731
732    async fn add_media_track(
733        &self,
734        target: &Dest,
735        track_id: &str,
736        codec: &str,
737        media_type: &str,
738    ) -> ActorResult<()> {
739        let gate = self.select_gate(target)?;
740        let target_id = self.extract_target_id(target);
741        gate.add_media_track(target_id, track_id, codec, media_type)
742            .await
743    }
744
745    async fn remove_media_track(&self, target: &Dest, track_id: &str) -> ActorResult<()> {
746        let gate = self.select_gate(target)?;
747        let target_id = self.extract_target_id(target);
748        gate.remove_media_track(target_id, track_id).await
749    }
750}