actr_runtime/
context.rs

1//! Runtime Context Implementation
2//!
3//! Implements the Context trait defined in actr-framework.
4
5use crate::inbound::{DataStreamRegistry, MediaFrameRegistry};
6use crate::lifecycle::compat_lock::{CompatLockManager, CompatibilityCheck};
7use crate::outbound::OutGate;
8use crate::wire::webrtc::SignalingClient;
9#[cfg(feature = "opentelemetry")]
10use crate::wire::webrtc::trace::inject_span_context_to_rpc;
11use actr_config::lock::LockFile;
12use actr_framework::{Bytes, Context, DataStream, Dest, MediaSample};
13use actr_protocol::{
14    AIdCredential, ActorResult, ActrError, ActrId, ActrType, PayloadType, ProtocolError,
15    RouteCandidatesRequest, RpcEnvelope, RpcRequest, route_candidates_request,
16};
17use async_trait::async_trait;
18use futures_util::future::BoxFuture;
19use std::path::PathBuf;
20use std::sync::Arc;
21
22/// RuntimeContext - Runtime's implementation of Context trait
23///
24/// # 设计特性
25///
26/// - **零虚函数**:内部使用 OutGate enum dispatch(非 dyn)
27/// - **智能路由**:根据 Dest 自动选择 InprocOut 或 OutprocOut
28/// - **完整实现**:包含 call/tell 的完整逻辑(编码、发送、解码)
29/// - **类型安全**:泛型方法提供编译时类型检查
30///
31/// # 性能
32///
33/// - OutGate 是 enum,使用静态分发
34/// - 编译器可完全内联整个调用链
35/// - 零虚函数调用开销
36#[derive(Clone)]
37pub struct RuntimeContext {
38    self_id: ActrId,
39    caller_id: Option<ActrId>,
40    request_id: String,
41    inproc_gate: OutGate,                          // Shell/Local 调用 - 立即可用
42    outproc_gate: Option<OutGate>,                 // 远程 Actor 调用 - 延迟初始化
43    data_stream_registry: Arc<DataStreamRegistry>, // DataStream 回调注册表
44    media_frame_registry: Arc<MediaFrameRegistry>, // MediaTrack 回调注册表
45    signaling_client: Arc<dyn SignalingClient>,
46    credential: AIdCredential,
47    actr_lock: Option<LockFile>, // Actr.lock.toml for fingerprint lookups
48    config_dir: Option<PathBuf>, // Config directory for compat.lock.toml
49}
50
51impl RuntimeContext {
52    /// 创建新的 RuntimeContext
53    ///
54    /// # 参数
55    ///
56    /// - `self_id`: 当前 Actor 的 ID
57    /// - `caller_id`: 调用方 Actor ID(可选)
58    /// - `request_id`: 当前请求唯一 ID
59    /// - `inproc_gate`: 进程内通信 gate(立即可用)
60    /// - `outproc_gate`: 跨进程通信 gate(可能为 None,等待 WebRTC 初始化)
61    /// - `data_stream_registry`: DataStream 回调注册表
62    /// - `media_frame_registry`: MediaTrack 回调注册表
63    /// - `signaling_client`: 用于路由发现的信令客户端
64    /// - `credential`: 该 Actor 的凭证(调用信令接口时使用)
65    /// - `actr_lock`: Actr.lock.toml 依赖配置(用于 fingerprint 查找)
66    /// - `config_dir`: 配置目录路径(用于 compat.lock.toml Fast Path 缓存)
67    #[allow(clippy::too_many_arguments)] // Internal API - all parameters are required
68    pub fn new(
69        self_id: ActrId,
70        caller_id: Option<ActrId>,
71        request_id: String,
72        inproc_gate: OutGate,
73        outproc_gate: Option<OutGate>,
74        data_stream_registry: Arc<DataStreamRegistry>,
75        media_frame_registry: Arc<MediaFrameRegistry>,
76        signaling_client: Arc<dyn SignalingClient>,
77        credential: AIdCredential,
78        actr_lock: Option<LockFile>,
79        config_dir: Option<PathBuf>,
80    ) -> Self {
81        Self {
82            self_id,
83            caller_id,
84            request_id,
85            inproc_gate,
86            outproc_gate,
87            data_stream_registry,
88            media_frame_registry,
89            signaling_client,
90            credential,
91            actr_lock,
92            config_dir,
93        }
94    }
95
96    /// 根据 Dest 选择合适的 gate
97    ///
98    /// - Dest::Shell → inproc_gate(立即可用)
99    /// - Dest::Local → inproc_gate(立即可用)
100    /// - Dest::Actor(_) → outproc_gate(需要检查是否已初始化)
101    #[inline]
102    fn select_gate(&self, dest: &Dest) -> ActorResult<&OutGate> {
103        match dest {
104            Dest::Shell | Dest::Local => Ok(&self.inproc_gate),
105            Dest::Actor(_) => self.outproc_gate.as_ref().ok_or_else(|| {
106                ProtocolError::Actr(ActrError::GateNotInitialized {
107                    message: "OutprocOutGate not initialized yet (WebRTC setup in progress)"
108                        .to_string(),
109                })
110            }),
111        }
112    }
113
114    /// 从 Dest 提取目标 ActrId
115    ///
116    /// - Dest::Shell → self_id(Workload → App 反向调用)
117    /// - Dest::Local → self_id(调用本地 Workload)
118    /// - Dest::Actor(id) → id(远程调用)
119    #[inline]
120    fn extract_target_id<'a>(&'a self, dest: &'a Dest) -> &'a ActrId {
121        match dest {
122            Dest::Shell | Dest::Local => &self.self_id,
123            Dest::Actor(id) => id,
124        }
125    }
126
127    /// Execute a non-generic RPC request call (useful for language bindings).
128    #[cfg_attr(
129        feature = "opentelemetry",
130        tracing::instrument(skip_all, name = "RuntimeContext.call_raw")
131    )]
132    pub async fn call_raw(
133        &self,
134        target: &Dest,
135        route_key: String,
136        payload_type: PayloadType,
137        payload: Bytes,
138        timeout_ms: i64,
139    ) -> ActorResult<Bytes> {
140        #[cfg(feature = "opentelemetry")]
141        use crate::wire::webrtc::trace::inject_span_context_to_rpc;
142
143        #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
144        let mut envelope = RpcEnvelope {
145            route_key,
146            payload: Some(payload),
147            error: None,
148            traceparent: None,
149            tracestate: None,
150            request_id: uuid::Uuid::new_v4().to_string(),
151            metadata: vec![],
152            timeout_ms,
153        };
154        #[cfg(feature = "opentelemetry")]
155        inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
156
157        let gate = self.select_gate(target)?;
158        let target_id = self.extract_target_id(target);
159        gate.send_request_with_type(target_id, payload_type, envelope)
160            .await
161    }
162
163    /// Execute a non-generic RPC message call (fire-and-forget, useful for language bindings).
164    #[cfg_attr(
165        feature = "opentelemetry",
166        tracing::instrument(skip_all, name = "RuntimeContext.tell_raw")
167    )]
168    pub async fn tell_raw(
169        &self,
170        target: &Dest,
171        route_key: String,
172        payload_type: PayloadType,
173        payload: Bytes,
174    ) -> ActorResult<()> {
175        #[cfg(feature = "opentelemetry")]
176        use crate::wire::webrtc::trace::inject_span_context_to_rpc;
177
178        #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
179        let mut envelope = RpcEnvelope {
180            route_key,
181            payload: Some(payload),
182            error: None,
183            traceparent: None,
184            tracestate: None,
185            request_id: uuid::Uuid::new_v4().to_string(),
186            metadata: vec![],
187            timeout_ms: 0,
188        };
189        #[cfg(feature = "opentelemetry")]
190        inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
191
192        let gate = self.select_gate(target)?;
193        let target_id = self.extract_target_id(target);
194        gate.send_message_with_type(target_id, payload_type, envelope)
195            .await
196    }
197
198    /// Send DataStream with an explicit payload type (lane selection).
199    ///
200    /// This is intended for language bindings; the `Context` trait method
201    /// `send_data_stream()` currently defaults to StreamReliable.
202    pub async fn send_data_stream_with_type(
203        &self,
204        target: &Dest,
205        payload_type: actr_protocol::PayloadType,
206        chunk: DataStream,
207    ) -> ActorResult<()> {
208        use actr_protocol::prost::Message as ProstMessage;
209
210        let payload = chunk.encode_to_vec();
211
212        let gate = self.select_gate(target)?;
213        let target_id = self.extract_target_id(target);
214
215        let result = gate
216            .send_data_stream(target_id, payload_type, bytes::Bytes::from(payload).into())
217            .await;
218
219        result
220    }
221
222    /// Get dependency fingerprint from Actr.lock.toml
223    fn get_dependency_fingerprint(&self, target_type: &ActrType) -> Option<String> {
224        let actr_lock = self.actr_lock.as_ref()?;
225
226        // Try different name formats to find the dependency
227        let service_name = format!("{}/{}", target_type.manufacturer, target_type.name);
228        let actr_type_name = format!("{}+{}", target_type.manufacturer, target_type.name);
229
230        // First try by service name
231        if let Some(dep) = actr_lock.get_dependency(&service_name) {
232            return Some(dep.fingerprint.clone());
233        }
234
235        // Try by actr_type format
236        if let Some(dep) = actr_lock.get_dependency(&actr_type_name) {
237            return Some(dep.fingerprint.clone());
238        }
239
240        // Try by just the name part
241        if let Some(dep) = actr_lock.get_dependency(&target_type.name) {
242            return Some(dep.fingerprint.clone());
243        }
244
245        // Search through all dependencies by actr_type field
246        for dep in &actr_lock.dependencies {
247            if dep.actr_type == actr_type_name || dep.actr_type == target_type.name {
248                return Some(dep.fingerprint.clone());
249            }
250        }
251
252        None
253    }
254
255    /// Internal: Send discovery request to signaling server
256    async fn send_discovery_request(
257        &self,
258        target_type: &ActrType,
259        candidate_count: u32,
260        client_fingerprint: String,
261    ) -> ActorResult<InternalDiscoveryResult> {
262        let criteria = route_candidates_request::NodeSelectionCriteria {
263            candidate_count,
264            ranking_factors: Vec::new(),
265            minimal_dependency_requirement: None,
266            minimal_health_requirement: None,
267        };
268
269        let request = RouteCandidatesRequest {
270            target_type: target_type.clone(),
271            criteria: Some(criteria),
272            client_location: None,
273            client_fingerprint,
274        };
275
276        let response = self
277            .signaling_client
278            .send_route_candidates_request(self.self_id.clone(), self.credential.clone(), request)
279            .await
280            .map_err(|e| {
281                ProtocolError::TransportError(format!("Route candidates request failed: {e}"))
282            })?;
283
284        match response.result {
285            Some(actr_protocol::route_candidates_response::Result::Success(success)) => {
286                Ok(InternalDiscoveryResult {
287                    candidates: success.candidates,
288                    has_exact_match: success.has_exact_match.unwrap_or(false),
289                    is_sub_healthy: success.is_sub_healthy.unwrap_or(false),
290                    compatibility_info: success.compatibility_info,
291                })
292            }
293            Some(actr_protocol::route_candidates_response::Result::Error(err)) => {
294                Err(ProtocolError::TransportError(format!(
295                    "Route candidates error {}: {}",
296                    err.code, err.message
297                )))
298            }
299            None => Err(ProtocolError::TransportError(
300                "Invalid route candidates response: missing result".to_string(),
301            )),
302        }
303    }
304
305    /// Internal: Handle negotiation result - log warnings and update compat.lock.toml
306    async fn handle_negotiation_result(
307        &self,
308        target_type: &ActrType,
309        client_fingerprint: &str,
310        compatibility_info: &[actr_protocol::CandidateCompatibilityInfo],
311        has_exact_match: bool,
312        is_sub_healthy: bool,
313    ) {
314        let service_name = format!("{}/{}", target_type.manufacturer, target_type.name);
315
316        // Log detailed compatibility info
317        for info in compatibility_info {
318            let status = if info.is_exact_match.unwrap_or(false) {
319                "✅ 精确匹配"
320            } else if let Some(ref result) = info.analysis_result {
321                match result.level() {
322                    actr_protocol::CompatibilityLevel::FullyCompatible => "✅ 完全兼容",
323                    actr_protocol::CompatibilityLevel::BackwardCompatible => "⚠️ 向后兼容",
324                    actr_protocol::CompatibilityLevel::BreakingChanges => "❌ 破坏性变更",
325                }
326            } else {
327                "❓ 未知"
328            };
329
330            tracing::debug!(
331                "   - 候选 {}: {} (指纹: {})",
332                info.candidate_id.serial_number,
333                status,
334                &info.candidate_fingerprint[..20.min(info.candidate_fingerprint.len())]
335            );
336        }
337
338        // Handle sub-healthy state - update compat.lock.toml if config_dir is available
339        if let Some(config_dir) = &self.config_dir {
340            if is_sub_healthy && !has_exact_match {
341                // Find the first compatible (non-exact) match for logging
342                if let Some(resolved) = compatibility_info.first() {
343                    tracing::warn!(
344                        "🟡 SYSTEM SUB-HEALTHY: Service '{}' using compatible fingerprint ({}) \
345                         instead of exact match ({}). Run 'actr install --force-update' to restore health.",
346                        service_name,
347                        &resolved.candidate_fingerprint
348                            [..20.min(resolved.candidate_fingerprint.len())],
349                        &client_fingerprint[..20.min(client_fingerprint.len())]
350                    );
351
352                    // Update compat.lock.toml
353                    let mut manager = CompatLockManager::new(config_dir.clone());
354                    if let Err(e) = manager
355                        .record_negotiation(
356                            &service_name,
357                            client_fingerprint,
358                            &resolved.candidate_fingerprint,
359                            false, // not exact match
360                            CompatibilityCheck::BackwardCompatible,
361                        )
362                        .await
363                    {
364                        tracing::warn!("Failed to update compat.lock.toml: {}", e);
365                    }
366                }
367            } else if has_exact_match {
368                // Exact match found - try to clean up compat.lock.toml entry if exists
369                let mut manager = CompatLockManager::new(config_dir.clone());
370                if let Ok(Some(_)) = manager.load().await {
371                    if let Some(resolved) = compatibility_info.first() {
372                        if let Err(e) = manager
373                            .record_negotiation(
374                                &service_name,
375                                client_fingerprint,
376                                &resolved.candidate_fingerprint,
377                                true, // exact match
378                                CompatibilityCheck::ExactMatch,
379                            )
380                            .await
381                        {
382                            tracing::debug!("Could not update compat.lock.toml: {}", e);
383                        }
384                    }
385                }
386            }
387        }
388    }
389}
390
391/// Internal discovery result structure
392struct InternalDiscoveryResult {
393    candidates: Vec<ActrId>,
394    has_exact_match: bool,
395    is_sub_healthy: bool,
396    compatibility_info: Vec<actr_protocol::CandidateCompatibilityInfo>,
397}
398
399#[async_trait]
400impl Context for RuntimeContext {
401    // ========== 数据访问方法 ==========
402
403    fn self_id(&self) -> &ActrId {
404        &self.self_id
405    }
406
407    fn caller_id(&self) -> Option<&ActrId> {
408        self.caller_id.as_ref()
409    }
410
411    fn request_id(&self) -> &str {
412        &self.request_id
413    }
414
415    // ========== 通信能力方法 ==========
416    #[cfg_attr(
417        feature = "opentelemetry",
418        tracing::instrument(skip_all, name = "RuntimeContext.call")
419    )]
420    async fn call<R: RpcRequest>(&self, target: &Dest, request: R) -> ActorResult<R::Response> {
421        use actr_protocol::prost::Message as ProstMessage;
422
423        // 1. 编码请求为 protobuf bytes
424        let payload: Bytes = request.encode_to_vec().into();
425
426        // 2. 从 RpcRequest trait 获取 route_key(编译时确定)
427        let route_key = R::route_key().to_string();
428
429        // 3. 构造 RpcEnvelope(使用 W3C tracing)
430        #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
431        let mut envelope = RpcEnvelope {
432            route_key,
433            payload: Some(payload),
434            error: None,
435            traceparent: None,
436            tracestate: None,
437            request_id: uuid::Uuid::new_v4().to_string(), // 生成新的 request_id
438            metadata: vec![],
439            timeout_ms: 30000, // 默认 30 秒超时
440        };
441        // Inject tracing context from current span
442        #[cfg(feature = "opentelemetry")]
443        inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
444
445        // 4. 根据 Dest 选择 gate 并提取目标 ActrId(Shell/Local 立即可用,Actor 需要检查)
446        let gate = self.select_gate(target)?;
447        let target_id = self.extract_target_id(target);
448
449        // 5. 通过 OutGate enum dispatch 发送(零虚函数调用!)
450        // Respect request's declared payload type (lane selection)
451        let response_bytes = gate
452            .send_request_with_type(target_id, R::payload_type(), envelope)
453            .await?;
454
455        // 6. 解码响应(类型安全:R::Response)
456        R::Response::decode(&*response_bytes).map_err(|e| {
457            ProtocolError::Actr(ActrError::DecodeFailure {
458                message: format!(
459                    "Failed to decode {}: {}",
460                    std::any::type_name::<R::Response>(),
461                    e
462                ),
463            })
464        })
465    }
466
467    #[cfg_attr(
468        feature = "opentelemetry",
469        tracing::instrument(skip_all, name = "RuntimeContext.tell")
470    )]
471    async fn tell<R: RpcRequest>(&self, target: &Dest, message: R) -> ActorResult<()> {
472        // 1. 编码消息
473        let payload: Bytes = message.encode_to_vec().into();
474
475        // 2. 获取 route_key
476        let route_key = R::route_key().to_string();
477
478        // 3. 构造 RpcEnvelope(fire-and-forget 语义)
479        #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
480        let mut envelope = RpcEnvelope {
481            route_key,
482            payload: Some(payload),
483            error: None,
484            traceparent: None,
485            tracestate: None,
486            request_id: uuid::Uuid::new_v4().to_string(),
487            metadata: vec![],
488            timeout_ms: 0, // 0 表示不等待响应
489        };
490        // Inject tracing context from current span
491        #[cfg(feature = "opentelemetry")]
492        inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
493
494        // 4. 根据 Dest 选择 gate 并提取目标 ActrId(Shell/Local 立即可用,Actor 需要检查)
495        let gate = self.select_gate(target)?;
496        let target_id = self.extract_target_id(target);
497
498        // 5. 通过 OutGate enum dispatch 发送(respect payload type)
499        gate.send_message_with_type(target_id, R::payload_type(), envelope)
500            .await
501    }
502
503    // ========== Fast Path: DataStream Methods ==========
504
505    async fn register_stream<F>(&self, stream_id: String, callback: F) -> ActorResult<()>
506    where
507        F: Fn(DataStream, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static,
508    {
509        tracing::debug!(
510            "📊 Registering DataStream callback for stream_id: {}",
511            stream_id
512        );
513        self.data_stream_registry
514            .register(stream_id, Arc::new(callback));
515        Ok(())
516    }
517
518    async fn unregister_stream(&self, stream_id: &str) -> ActorResult<()> {
519        tracing::debug!(
520            "🚫 Unregistering DataStream callback for stream_id: {}",
521            stream_id
522        );
523        self.data_stream_registry.unregister(stream_id);
524        Ok(())
525    }
526
527    async fn send_data_stream(&self, target: &Dest, chunk: DataStream) -> ActorResult<()> {
528        use actr_protocol::prost::Message as ProstMessage;
529
530        // 1. Serialize DataStream to bytes
531        let payload = chunk.encode_to_vec();
532
533        tracing::debug!(
534            "📤 Sending DataStream: stream_id={}, sequence={}, size={} bytes",
535            chunk.stream_id,
536            chunk.sequence,
537            payload.len()
538        );
539
540        // 2. Select gate based on Dest
541        let gate = self.select_gate(target)?;
542        let target_id = self.extract_target_id(target);
543
544        // 3. Send via OutGate with appropriate PayloadType
545        // Use StreamReliable for reliable ordered transmission
546        // TODO: Allow user to choose between StreamReliable and StreamLatencyFirst
547        gate.send_data_stream(
548            target_id,
549            actr_protocol::PayloadType::StreamReliable,
550            bytes::Bytes::from(payload),
551        )
552        .await
553    }
554
555    async fn discover_route_candidate(&self, target_type: &ActrType) -> ActorResult<ActrId> {
556        if !self.signaling_client.is_connected() {
557            return Err(ProtocolError::TransportError(
558                "Signaling client is not connected.".to_string(),
559            ));
560        }
561
562        let service_name = format!("{}/{}", target_type.manufacturer, target_type.name);
563
564        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
565        // Step 0: Fast Path - Check compat.lock.toml for cached negotiation
566        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
567        if let Some(config_dir) = &self.config_dir {
568            let mut compat_lock_manager = CompatLockManager::new(config_dir.clone());
569            if let Ok(Some(compat_lock)) = compat_lock_manager.load().await {
570                if let Some(cached_entry) = compat_lock.find_valid_entry(&service_name) {
571                    tracing::info!(
572                        "⚡ Fast path: Using cached negotiation for '{}' (resolved: {})",
573                        service_name,
574                        &cached_entry.resolved_fingerprint
575                            [..20.min(cached_entry.resolved_fingerprint.len())]
576                    );
577
578                    // Use the cached resolved_fingerprint to find candidates
579                    let result = self
580                        .send_discovery_request(
581                            target_type,
582                            1,
583                            cached_entry.resolved_fingerprint.clone(),
584                        )
585                        .await?;
586
587                    if let Some(candidate) = result.candidates.into_iter().next() {
588                        tracing::info!(
589                            "📊 服务发现结果 [{}]: 1 个候选 (快速路径, sub_healthy=true)",
590                            service_name
591                        );
592                        return Ok(candidate);
593                    }
594                    // If fast path fails, fall through to normal discovery
595                    tracing::warn!(
596                        "⚠️ Fast path failed for '{}', falling back to normal discovery",
597                        service_name
598                    );
599                }
600            }
601        }
602
603        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
604        // Step 1: Get fingerprint from Actr.lock.toml (REQUIRED)
605        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
606        let client_fingerprint = self.get_dependency_fingerprint(target_type).ok_or_else(|| {
607            tracing::error!(
608                severity = 10,
609                error_category = "dependency_missing",
610                "❌ DEPENDENCY NOT FOUND: Service '{}' is not declared in Actr.lock.toml.\n\
611                 Please run 'actr install' to generate the lock file with all dependencies.",
612                service_name
613            );
614            ProtocolError::Actr(ActrError::DependencyNotFound {
615                service_name: service_name.clone(),
616                message: format!(
617                    "Dependency '{}' not found in Actr.lock.toml. Run 'actr install' to resolve dependencies.",
618                    service_name
619                ),
620            })
621        })?;
622
623        tracing::debug!(
624            "📋 Found dependency fingerprint for '{}': {}",
625            service_name,
626            &client_fingerprint[..20.min(client_fingerprint.len())]
627        );
628
629        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
630        // Step 2: Send discovery request to signaling server
631        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
632        let result = self
633            .send_discovery_request(target_type, 1, client_fingerprint.clone())
634            .await?;
635
636        let has_exact_match = result.has_exact_match;
637        let is_sub_healthy = result.is_sub_healthy;
638
639        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
640        // Step 3 & 4: Handle negotiation result
641        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
642        self.handle_negotiation_result(
643            target_type,
644            &client_fingerprint,
645            &result.compatibility_info,
646            has_exact_match,
647            is_sub_healthy,
648        )
649        .await;
650
651        // Log result
652        tracing::info!(
653            "📊 服务发现结果 [{}]: {} 个候选, exact_match={}, sub_healthy={}",
654            service_name,
655            result.candidates.len(),
656            has_exact_match,
657            is_sub_healthy
658        );
659
660        result.candidates.into_iter().next().ok_or_else(|| {
661            ProtocolError::TargetNotFound(format!(
662                "No route candidates for type {}/{}",
663                target_type.manufacturer, target_type.name
664            ))
665        })
666    }
667
668    #[cfg_attr(
669        feature = "opentelemetry",
670        tracing::instrument(skip_all, name = "RuntimeContext.call_raw")
671    )]
672    async fn call_raw(
673        &self,
674        target: &ActrId,
675        route_key: &str,
676        payload: Bytes,
677    ) -> ActorResult<Bytes> {
678        // 1. Construct RpcEnvelope with raw payload
679        #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
680        let mut envelope = RpcEnvelope {
681            route_key: route_key.to_string(),
682            payload: Some(payload),
683            error: None,
684            traceparent: None,
685            tracestate: None,
686            request_id: uuid::Uuid::new_v4().to_string(),
687            metadata: vec![],
688            timeout_ms: 30000, // Default 30 second timeout
689        };
690
691        // Inject tracing context from current span
692        #[cfg(feature = "opentelemetry")]
693        inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
694
695        // 2. Select outproc gate (raw calls are always remote)
696        let gate = self.outproc_gate.as_ref().ok_or_else(|| {
697            ProtocolError::Actr(ActrError::GateNotInitialized {
698                message: "OutprocOutGate not initialized yet (WebRTC setup in progress)"
699                    .to_string(),
700            })
701        })?;
702
703        // 3. Send request and return raw response bytes
704        gate.send_request(target, envelope).await
705    }
706
707    // ========== Fast Path: MediaTrack Methods ==========
708
709    async fn register_media_track<F>(&self, track_id: String, callback: F) -> ActorResult<()>
710    where
711        F: Fn(MediaSample, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static,
712    {
713        tracing::debug!(
714            "📹 Registering MediaTrack callback for track_id: {}",
715            track_id
716        );
717        self.media_frame_registry
718            .register(track_id, Arc::new(callback));
719        Ok(())
720    }
721
722    async fn unregister_media_track(&self, track_id: &str) -> ActorResult<()> {
723        tracing::debug!(
724            "📹 Unregistering MediaTrack callback for track_id: {}",
725            track_id
726        );
727        self.media_frame_registry.unregister(track_id);
728        Ok(())
729    }
730
731    async fn send_media_sample(
732        &self,
733        target: &Dest,
734        track_id: &str,
735        sample: MediaSample,
736    ) -> ActorResult<()> {
737        // 1. Select appropriate gate based on Dest
738        let gate = self.select_gate(target)?;
739
740        // 2. Extract target ActrId
741        let target_id = self.extract_target_id(target);
742
743        // 3. Send via OutGate (delegates to WebRTC Track)
744        gate.send_media_sample(target_id, track_id, sample).await
745    }
746}