actr_runtime/
context.rs

1//! Runtime Context Implementation
2//!
3//! Implements the Context trait defined in actr-framework.
4
5use crate::inbound::{DataStreamRegistry, MediaFrameRegistry};
6use crate::outbound::OutGate;
7use crate::wire::webrtc::SignalingClient;
8#[cfg(feature = "opentelemetry")]
9use crate::wire::webrtc::trace::inject_span_context_to_rpc;
10use actr_framework::{Bytes, Context, DataStream, Dest, MediaSample};
11use actr_protocol::{
12    AIdCredential, ActorResult, ActrError, ActrId, ActrType, PayloadType, ProtocolError,
13    RouteCandidatesRequest, RpcEnvelope, RpcRequest, route_candidates_request,
14};
15use async_trait::async_trait;
16use futures_util::future::BoxFuture;
17use std::sync::Arc;
18
19/// RuntimeContext - Runtime's implementation of Context trait
20///
21/// # 设计特性
22///
23/// - **零虚函数**:内部使用 OutGate enum dispatch(非 dyn)
24/// - **智能路由**:根据 Dest 自动选择 InprocOut 或 OutprocOut
25/// - **完整实现**:包含 call/tell 的完整逻辑(编码、发送、解码)
26/// - **类型安全**:泛型方法提供编译时类型检查
27///
28/// # 性能
29///
30/// - OutGate 是 enum,使用静态分发
31/// - 编译器可完全内联整个调用链
32/// - 零虚函数调用开销
33#[derive(Clone)]
34pub struct RuntimeContext {
35    self_id: ActrId,
36    caller_id: Option<ActrId>,
37    request_id: String,
38    inproc_gate: OutGate,                          // Shell/Local 调用 - 立即可用
39    outproc_gate: Option<OutGate>,                 // 远程 Actor 调用 - 延迟初始化
40    data_stream_registry: Arc<DataStreamRegistry>, // DataStream 回调注册表
41    media_frame_registry: Arc<MediaFrameRegistry>, // MediaTrack 回调注册表
42    signaling_client: Arc<dyn SignalingClient>,
43    credential: AIdCredential,
44}
45
46impl RuntimeContext {
47    /// 创建新的 RuntimeContext
48    ///
49    /// # 参数
50    ///
51    /// - `self_id`: 当前 Actor 的 ID
52    /// - `caller_id`: 调用方 Actor ID(可选)
53    /// - `request_id`: 当前请求唯一 ID
54    /// - `inproc_gate`: 进程内通信 gate(立即可用)
55    /// - `outproc_gate`: 跨进程通信 gate(可能为 None,等待 WebRTC 初始化)
56    /// - `data_stream_registry`: DataStream 回调注册表
57    /// - `media_frame_registry`: MediaTrack 回调注册表
58    /// - `signaling_client`: 用于路由发现的信令客户端
59    /// - `credential`: 该 Actor 的凭证(调用信令接口时使用)
60    #[allow(clippy::too_many_arguments)] // Internal API - all parameters are required
61    pub fn new(
62        self_id: ActrId,
63        caller_id: Option<ActrId>,
64        request_id: String,
65        inproc_gate: OutGate,
66        outproc_gate: Option<OutGate>,
67        data_stream_registry: Arc<DataStreamRegistry>,
68        media_frame_registry: Arc<MediaFrameRegistry>,
69        signaling_client: Arc<dyn SignalingClient>,
70        credential: AIdCredential,
71    ) -> Self {
72        Self {
73            self_id,
74            caller_id,
75            request_id,
76            inproc_gate,
77            outproc_gate,
78            data_stream_registry,
79            media_frame_registry,
80            signaling_client,
81            credential,
82        }
83    }
84
85    /// 根据 Dest 选择合适的 gate
86    ///
87    /// - Dest::Shell → inproc_gate(立即可用)
88    /// - Dest::Local → inproc_gate(立即可用)
89    /// - Dest::Actor(_) → outproc_gate(需要检查是否已初始化)
90    #[inline]
91    fn select_gate(&self, dest: &Dest) -> ActorResult<&OutGate> {
92        match dest {
93            Dest::Shell | Dest::Local => Ok(&self.inproc_gate),
94            Dest::Actor(_) => self.outproc_gate.as_ref().ok_or_else(|| {
95                ProtocolError::Actr(ActrError::GateNotInitialized {
96                    message: "OutprocOutGate not initialized yet (WebRTC setup in progress)"
97                        .to_string(),
98                })
99            }),
100        }
101    }
102
103    /// 从 Dest 提取目标 ActrId
104    ///
105    /// - Dest::Shell → self_id(Workload → App 反向调用)
106    /// - Dest::Local → self_id(调用本地 Workload)
107    /// - Dest::Actor(id) → id(远程调用)
108    #[inline]
109    fn extract_target_id<'a>(&'a self, dest: &'a Dest) -> &'a ActrId {
110        match dest {
111            Dest::Shell | Dest::Local => &self.self_id,
112            Dest::Actor(id) => id,
113        }
114    }
115
116    /// Execute a non-generic RPC request call (useful for language bindings).
117    #[cfg_attr(
118        feature = "opentelemetry",
119        tracing::instrument(skip_all, name = "RuntimeContext.call_raw")
120    )]
121    pub async fn call_raw(
122        &self,
123        target: &Dest,
124        route_key: String,
125        payload_type: PayloadType,
126        payload: Bytes,
127        timeout_ms: i64,
128    ) -> ActorResult<Bytes> {
129        #[cfg(feature = "opentelemetry")]
130        use crate::wire::webrtc::trace::inject_span_context_to_rpc;
131
132        #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
133        let mut envelope = RpcEnvelope {
134            route_key,
135            payload: Some(payload),
136            error: None,
137            traceparent: None,
138            tracestate: None,
139            request_id: uuid::Uuid::new_v4().to_string(),
140            metadata: vec![],
141            timeout_ms,
142        };
143        #[cfg(feature = "opentelemetry")]
144        inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
145
146        let gate = self.select_gate(target)?;
147        let target_id = self.extract_target_id(target);
148        gate.send_request_with_type(target_id, payload_type, envelope)
149            .await
150    }
151
152    /// Execute a non-generic RPC message call (fire-and-forget, useful for language bindings).
153    #[cfg_attr(
154        feature = "opentelemetry",
155        tracing::instrument(skip_all, name = "RuntimeContext.tell_raw")
156    )]
157    pub async fn tell_raw(
158        &self,
159        target: &Dest,
160        route_key: String,
161        payload_type: PayloadType,
162        payload: Bytes,
163    ) -> ActorResult<()> {
164        #[cfg(feature = "opentelemetry")]
165        use crate::wire::webrtc::trace::inject_span_context_to_rpc;
166
167        #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
168        let mut envelope = RpcEnvelope {
169            route_key,
170            payload: Some(payload),
171            error: None,
172            traceparent: None,
173            tracestate: None,
174            request_id: uuid::Uuid::new_v4().to_string(),
175            metadata: vec![],
176            timeout_ms: 0,
177        };
178        #[cfg(feature = "opentelemetry")]
179        inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
180
181        let gate = self.select_gate(target)?;
182        let target_id = self.extract_target_id(target);
183        gate.send_message_with_type(target_id, payload_type, envelope)
184            .await
185    }
186
187    /// Send DataStream with an explicit payload type (lane selection).
188    ///
189    /// This is intended for language bindings; the `Context` trait method
190    /// `send_data_stream()` currently defaults to StreamReliable.
191    pub async fn send_data_stream_with_type(
192        &self,
193        target: &Dest,
194        payload_type: actr_protocol::PayloadType,
195        chunk: DataStream,
196    ) -> ActorResult<()> {
197        use actr_protocol::prost::Message as ProstMessage;
198
199        let payload = chunk.encode_to_vec();
200
201        let gate = self.select_gate(target)?;
202        let target_id = self.extract_target_id(target);
203
204        let result = gate
205            .send_data_stream(target_id, payload_type, bytes::Bytes::from(payload).into())
206            .await;
207
208        result
209    }
210}
211
212#[async_trait]
213impl Context for RuntimeContext {
214    // ========== 数据访问方法 ==========
215
216    fn self_id(&self) -> &ActrId {
217        &self.self_id
218    }
219
220    fn caller_id(&self) -> Option<&ActrId> {
221        self.caller_id.as_ref()
222    }
223
224    fn request_id(&self) -> &str {
225        &self.request_id
226    }
227
228    // ========== 通信能力方法 ==========
229    #[cfg_attr(
230        feature = "opentelemetry",
231        tracing::instrument(skip_all, name = "RuntimeContext.call")
232    )]
233    async fn call<R: RpcRequest>(&self, target: &Dest, request: R) -> ActorResult<R::Response> {
234        use actr_protocol::prost::Message as ProstMessage;
235
236        // 1. 编码请求为 protobuf bytes
237        let payload: Bytes = request.encode_to_vec().into();
238
239        // 2. 从 RpcRequest trait 获取 route_key(编译时确定)
240        let route_key = R::route_key().to_string();
241
242        // 3. 构造 RpcEnvelope(使用 W3C tracing)
243        #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
244        let mut envelope = RpcEnvelope {
245            route_key,
246            payload: Some(payload),
247            error: None,
248            traceparent: None,
249            tracestate: None,
250            request_id: uuid::Uuid::new_v4().to_string(), // 生成新的 request_id
251            metadata: vec![],
252            timeout_ms: 30000, // 默认 30 秒超时
253        };
254        // Inject tracing context from current span
255        #[cfg(feature = "opentelemetry")]
256        inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
257
258        // 4. 根据 Dest 选择 gate 并提取目标 ActrId(Shell/Local 立即可用,Actor 需要检查)
259        let gate = self.select_gate(target)?;
260        let target_id = self.extract_target_id(target);
261
262        // 5. 通过 OutGate enum dispatch 发送(零虚函数调用!)
263        // Respect request's declared payload type (lane selection)
264        let response_bytes = gate
265            .send_request_with_type(target_id, R::payload_type(), envelope)
266            .await?;
267
268        // 6. 解码响应(类型安全:R::Response)
269        R::Response::decode(&*response_bytes).map_err(|e| {
270            ProtocolError::Actr(ActrError::DecodeFailure {
271                message: format!(
272                    "Failed to decode {}: {}",
273                    std::any::type_name::<R::Response>(),
274                    e
275                ),
276            })
277        })
278    }
279
280    #[cfg_attr(
281        feature = "opentelemetry",
282        tracing::instrument(skip_all, name = "RuntimeContext.tell")
283    )]
284    async fn tell<R: RpcRequest>(&self, target: &Dest, message: R) -> ActorResult<()> {
285        // 1. 编码消息
286        let payload: Bytes = message.encode_to_vec().into();
287
288        // 2. 获取 route_key
289        let route_key = R::route_key().to_string();
290
291        // 3. 构造 RpcEnvelope(fire-and-forget 语义)
292        #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
293        let mut envelope = RpcEnvelope {
294            route_key,
295            payload: Some(payload),
296            error: None,
297            traceparent: None,
298            tracestate: None,
299            request_id: uuid::Uuid::new_v4().to_string(),
300            metadata: vec![],
301            timeout_ms: 0, // 0 表示不等待响应
302        };
303        // Inject tracing context from current span
304        #[cfg(feature = "opentelemetry")]
305        inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
306
307        // 4. 根据 Dest 选择 gate 并提取目标 ActrId(Shell/Local 立即可用,Actor 需要检查)
308        let gate = self.select_gate(target)?;
309        let target_id = self.extract_target_id(target);
310
311        // 5. 通过 OutGate enum dispatch 发送(respect payload type)
312        gate.send_message_with_type(target_id, R::payload_type(), envelope)
313            .await
314    }
315
316    // ========== Fast Path: DataStream Methods ==========
317
318    async fn register_stream<F>(&self, stream_id: String, callback: F) -> ActorResult<()>
319    where
320        F: Fn(DataStream, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static,
321    {
322        tracing::debug!(
323            "📊 Registering DataStream callback for stream_id: {}",
324            stream_id
325        );
326        self.data_stream_registry
327            .register(stream_id, Arc::new(callback));
328        Ok(())
329    }
330
331    async fn unregister_stream(&self, stream_id: &str) -> ActorResult<()> {
332        tracing::debug!(
333            "🚫 Unregistering DataStream callback for stream_id: {}",
334            stream_id
335        );
336        self.data_stream_registry.unregister(stream_id);
337        Ok(())
338    }
339
340    async fn send_data_stream(&self, target: &Dest, chunk: DataStream) -> ActorResult<()> {
341        use actr_protocol::prost::Message as ProstMessage;
342
343        // 1. Serialize DataStream to bytes
344        let payload = chunk.encode_to_vec();
345
346        tracing::debug!(
347            "📤 Sending DataStream: stream_id={}, sequence={}, size={} bytes",
348            chunk.stream_id,
349            chunk.sequence,
350            payload.len()
351        );
352
353        // 2. Select gate based on Dest
354        let gate = self.select_gate(target)?;
355        let target_id = self.extract_target_id(target);
356
357        // 3. Send via OutGate with appropriate PayloadType
358        // Use StreamReliable for reliable ordered transmission
359        // TODO: Allow user to choose between StreamReliable and StreamLatencyFirst
360        gate.send_data_stream(
361            target_id,
362            actr_protocol::PayloadType::StreamReliable,
363            bytes::Bytes::from(payload),
364        )
365        .await
366    }
367
368    async fn discover_route_candidate(&self, target_type: &ActrType) -> ActorResult<ActrId> {
369        if !self.signaling_client.is_connected() {
370            return Err(ProtocolError::TransportError(
371                "Signaling client is not connected.".to_string(),
372            ));
373        }
374
375        let criteria = route_candidates_request::NodeSelectionCriteria {
376            candidate_count: 1,
377            ranking_factors: Vec::new(),
378            minimal_dependency_requirement: None,
379            minimal_health_requirement: None,
380        };
381
382        let request = RouteCandidatesRequest {
383            target_type: target_type.clone(),
384            criteria: Some(criteria),
385            client_location: None,
386        };
387
388        let response = self
389            .signaling_client
390            .send_route_candidates_request(self.self_id.clone(), self.credential.clone(), request)
391            .await
392            .map_err(|e| {
393                ProtocolError::TransportError(format!("Route candidates request failed: {e}"))
394            })?;
395
396        match response.result {
397            Some(actr_protocol::route_candidates_response::Result::Success(ok)) => {
398                ok.candidates.into_iter().next().ok_or_else(|| {
399                    ProtocolError::TargetNotFound(format!(
400                        "No route candidates for type {}.{}",
401                        target_type.manufacturer, target_type.name
402                    ))
403                })
404            }
405            Some(actr_protocol::route_candidates_response::Result::Error(err)) => {
406                Err(ProtocolError::TransportError(format!(
407                    "Route candidates error {}: {}",
408                    err.code, err.message
409                )))
410            }
411            None => Err(ProtocolError::TransportError(
412                "Route candidates response missing result".to_string(),
413            )),
414        }
415    }
416
417    #[cfg_attr(
418        feature = "opentelemetry",
419        tracing::instrument(skip_all, name = "RuntimeContext.call_raw")
420    )]
421    async fn call_raw(
422        &self,
423        target: &ActrId,
424        route_key: &str,
425        payload: Bytes,
426    ) -> ActorResult<Bytes> {
427        // 1. Construct RpcEnvelope with raw payload
428        #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
429        let mut envelope = RpcEnvelope {
430            route_key: route_key.to_string(),
431            payload: Some(payload),
432            error: None,
433            traceparent: None,
434            tracestate: None,
435            request_id: uuid::Uuid::new_v4().to_string(),
436            metadata: vec![],
437            timeout_ms: 30000, // Default 30 second timeout
438        };
439
440        // Inject tracing context from current span
441        #[cfg(feature = "opentelemetry")]
442        inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
443
444        // 2. Select outproc gate (raw calls are always remote)
445        let gate = self.outproc_gate.as_ref().ok_or_else(|| {
446            ProtocolError::Actr(ActrError::GateNotInitialized {
447                message: "OutprocOutGate not initialized yet (WebRTC setup in progress)"
448                    .to_string(),
449            })
450        })?;
451
452        // 3. Send request and return raw response bytes
453        gate.send_request(target, envelope).await
454    }
455
456    // ========== Fast Path: MediaTrack Methods ==========
457
458    async fn register_media_track<F>(&self, track_id: String, callback: F) -> ActorResult<()>
459    where
460        F: Fn(MediaSample, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static,
461    {
462        tracing::debug!(
463            "📹 Registering MediaTrack callback for track_id: {}",
464            track_id
465        );
466        self.media_frame_registry
467            .register(track_id, Arc::new(callback));
468        Ok(())
469    }
470
471    async fn unregister_media_track(&self, track_id: &str) -> ActorResult<()> {
472        tracing::debug!(
473            "📹 Unregistering MediaTrack callback for track_id: {}",
474            track_id
475        );
476        self.media_frame_registry.unregister(track_id);
477        Ok(())
478    }
479
480    async fn send_media_sample(
481        &self,
482        target: &Dest,
483        track_id: &str,
484        sample: MediaSample,
485    ) -> ActorResult<()> {
486        // 1. Select appropriate gate based on Dest
487        let gate = self.select_gate(target)?;
488
489        // 2. Extract target ActrId
490        let target_id = self.extract_target_id(target);
491
492        // 3. Send via OutGate (delegates to WebRTC Track)
493        gate.send_media_sample(target_id, track_id, sample).await
494    }
495}