actr_runtime/
context.rs

1//! Runtime Context Implementation
2//!
3//! Implements the Context trait defined in actr-framework.
4
5use crate::inbound::{DataStreamRegistry, MediaFrameRegistry};
6use crate::outbound::OutGate;
7use crate::wire::webrtc::SignalingClient;
8use actr_framework::{Bytes, Context, DataStream, Dest, MediaSample};
9use actr_protocol::{
10    AIdCredential, ActorResult, ActrError, ActrId, ActrType, ProtocolError, RouteCandidatesRequest,
11    RpcEnvelope, RpcRequest, route_candidates_request,
12};
13use async_trait::async_trait;
14use futures_util::future::BoxFuture;
15use std::sync::Arc;
16
17/// RuntimeContext - Runtime's implementation of Context trait
18///
19/// # 设计特性
20///
21/// - **零虚函数**:内部使用 OutGate enum dispatch(非 dyn)
22/// - **智能路由**:根据 Dest 自动选择 InprocOut 或 OutprocOut
23/// - **完整实现**:包含 call/tell 的完整逻辑(编码、发送、解码)
24/// - **类型安全**:泛型方法提供编译时类型检查
25///
26/// # 性能
27///
28/// - OutGate 是 enum,使用静态分发
29/// - 编译器可完全内联整个调用链
30/// - 零虚函数调用开销
31#[derive(Clone)]
32pub struct RuntimeContext {
33    self_id: ActrId,
34    caller_id: Option<ActrId>,
35    request_id: String,
36    inproc_gate: OutGate,                          // Shell/Local 调用 - 立即可用
37    outproc_gate: Option<OutGate>,                 // 远程 Actor 调用 - 延迟初始化
38    data_stream_registry: Arc<DataStreamRegistry>, // DataStream 回调注册表
39    media_frame_registry: Arc<MediaFrameRegistry>, // MediaTrack 回调注册表
40    signaling_client: Arc<dyn SignalingClient>,
41    credential: AIdCredential,
42}
43
44impl RuntimeContext {
45    /// 创建新的 RuntimeContext
46    ///
47    /// # 参数
48    ///
49    /// - `self_id`: 当前 Actor 的 ID
50    /// - `caller_id`: 调用方 Actor ID(可选)
51    /// - `request_id`: 当前请求唯一 ID
52    /// - `inproc_gate`: 进程内通信 gate(立即可用)
53    /// - `outproc_gate`: 跨进程通信 gate(可能为 None,等待 WebRTC 初始化)
54    /// - `data_stream_registry`: DataStream 回调注册表
55    /// - `media_frame_registry`: MediaTrack 回调注册表
56    /// - `signaling_client`: 用于路由发现的信令客户端
57    /// - `credential`: 该 Actor 的凭证(调用信令接口时使用)
58    #[allow(clippy::too_many_arguments)] // Internal API - all parameters are required
59    pub fn new(
60        self_id: ActrId,
61        caller_id: Option<ActrId>,
62        request_id: String,
63        inproc_gate: OutGate,
64        outproc_gate: Option<OutGate>,
65        data_stream_registry: Arc<DataStreamRegistry>,
66        media_frame_registry: Arc<MediaFrameRegistry>,
67        signaling_client: Arc<dyn SignalingClient>,
68        credential: AIdCredential,
69    ) -> Self {
70        Self {
71            self_id,
72            caller_id,
73            request_id,
74            inproc_gate,
75            outproc_gate,
76            data_stream_registry,
77            media_frame_registry,
78            signaling_client,
79            credential,
80        }
81    }
82
83    /// 根据 Dest 选择合适的 gate
84    ///
85    /// - Dest::Shell → inproc_gate(立即可用)
86    /// - Dest::Local → inproc_gate(立即可用)
87    /// - Dest::Actor(_) → outproc_gate(需要检查是否已初始化)
88    #[inline]
89    fn select_gate(&self, dest: &Dest) -> ActorResult<&OutGate> {
90        match dest {
91            Dest::Shell | Dest::Local => Ok(&self.inproc_gate),
92            Dest::Actor(_) => self.outproc_gate.as_ref().ok_or_else(|| {
93                ProtocolError::Actr(ActrError::GateNotInitialized {
94                    message: "OutprocOutGate not initialized yet (WebRTC setup in progress)"
95                        .to_string(),
96                })
97            }),
98        }
99    }
100
101    /// 从 Dest 提取目标 ActrId
102    ///
103    /// - Dest::Shell → self_id(Workload → App 反向调用)
104    /// - Dest::Local → self_id(调用本地 Workload)
105    /// - Dest::Actor(id) → id(远程调用)
106    #[inline]
107    fn extract_target_id<'a>(&'a self, dest: &'a Dest) -> &'a ActrId {
108        match dest {
109            Dest::Shell | Dest::Local => &self.self_id,
110            Dest::Actor(id) => id,
111        }
112    }
113}
114
115#[async_trait]
116impl Context for RuntimeContext {
117    // ========== 数据访问方法 ==========
118
119    fn self_id(&self) -> &ActrId {
120        &self.self_id
121    }
122
123    fn caller_id(&self) -> Option<&ActrId> {
124        self.caller_id.as_ref()
125    }
126
127    fn request_id(&self) -> &str {
128        &self.request_id
129    }
130
131    // ========== 通信能力方法 ==========
132
133    async fn call<R: RpcRequest>(&self, target: &Dest, request: R) -> ActorResult<R::Response> {
134        use actr_protocol::prost::Message as ProstMessage;
135
136        // 1. 编码请求为 protobuf bytes
137        let payload: Bytes = request.encode_to_vec().into();
138
139        // 2. 从 RpcRequest trait 获取 route_key(编译时确定)
140        let route_key = R::route_key().to_string();
141
142        // 3. 构造 RpcEnvelope(使用 W3C tracing)
143        #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
144        let mut envelope = RpcEnvelope {
145            route_key,
146            payload: Some(payload),
147            error: None,
148            traceparent: None,
149            tracestate: None,
150            request_id: uuid::Uuid::new_v4().to_string(), // 生成新的 request_id
151            metadata: vec![],
152            timeout_ms: 30000, // 默认 30 秒超时
153        };
154        // Inject tracing context from current span
155        #[cfg(feature = "opentelemetry")]
156        {
157            use crate::wire::webrtc::trace::inject_span_context_to_rpc;
158            inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
159        }
160
161        // 4. 根据 Dest 选择 gate 并提取目标 ActrId(Shell/Local 立即可用,Actor 需要检查)
162        let gate = self.select_gate(target)?;
163        let target_id = self.extract_target_id(target);
164
165        // 5. 通过 OutGate enum dispatch 发送(零虚函数调用!)
166        let response_bytes = gate.send_request(target_id, envelope).await?;
167
168        // 6. 解码响应(类型安全:R::Response)
169        R::Response::decode(&*response_bytes).map_err(|e| {
170            ProtocolError::Actr(ActrError::DecodeFailure {
171                message: format!(
172                    "Failed to decode {}: {}",
173                    std::any::type_name::<R::Response>(),
174                    e
175                ),
176            })
177        })
178    }
179
180    async fn tell<R: RpcRequest>(&self, target: &Dest, message: R) -> ActorResult<()> {
181        // 1. 编码消息
182        let payload: Bytes = message.encode_to_vec().into();
183
184        // 2. 获取 route_key
185        let route_key = R::route_key().to_string();
186
187        // 3. 构造 RpcEnvelope(fire-and-forget 语义)
188        #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
189        let mut envelope = RpcEnvelope {
190            route_key,
191            payload: Some(payload),
192            error: None,
193            traceparent: None,
194            tracestate: None,
195            request_id: uuid::Uuid::new_v4().to_string(),
196            metadata: vec![],
197            timeout_ms: 0, // 0 表示不等待响应
198        };
199        // Inject tracing context from current span
200        #[cfg(feature = "opentelemetry")]
201        {
202            use crate::wire::webrtc::trace::inject_span_context_to_rpc;
203            inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
204        }
205
206        // 4. 根据 Dest 选择 gate 并提取目标 ActrId(Shell/Local 立即可用,Actor 需要检查)
207        let gate = self.select_gate(target)?;
208        let target_id = self.extract_target_id(target);
209
210        // 5. 通过 OutGate enum dispatch 发送
211        gate.send_message(target_id, envelope).await
212    }
213
214    // ========== Fast Path: DataStream Methods ==========
215
216    async fn register_stream<F>(&self, stream_id: String, callback: F) -> ActorResult<()>
217    where
218        F: Fn(DataStream, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static,
219    {
220        tracing::debug!(
221            "📊 Registering DataStream callback for stream_id: {}",
222            stream_id
223        );
224        self.data_stream_registry
225            .register(stream_id, Arc::new(callback));
226        Ok(())
227    }
228
229    async fn unregister_stream(&self, stream_id: &str) -> ActorResult<()> {
230        tracing::debug!(
231            "🚫 Unregistering DataStream callback for stream_id: {}",
232            stream_id
233        );
234        self.data_stream_registry.unregister(stream_id);
235        Ok(())
236    }
237
238    async fn send_data_stream(&self, target: &Dest, chunk: DataStream) -> ActorResult<()> {
239        use actr_protocol::prost::Message as ProstMessage;
240
241        // 1. Serialize DataStream to bytes
242        let payload = chunk.encode_to_vec();
243
244        tracing::debug!(
245            "📤 Sending DataStream: stream_id={}, sequence={}, size={} bytes",
246            chunk.stream_id,
247            chunk.sequence,
248            payload.len()
249        );
250
251        // 2. Select gate based on Dest
252        let gate = self.select_gate(target)?;
253        let target_id = self.extract_target_id(target);
254
255        // 3. Send via OutGate with appropriate PayloadType
256        // Use StreamReliable for reliable ordered transmission
257        // TODO: Allow user to choose between StreamReliable and StreamLatencyFirst
258        gate.send_data_stream(
259            target_id,
260            actr_protocol::PayloadType::StreamReliable,
261            bytes::Bytes::from(payload),
262        )
263        .await
264    }
265
266    async fn discover_route_candidate(&self, target_type: &ActrType) -> ActorResult<ActrId> {
267        if !self.signaling_client.is_connected() {
268            return Err(ProtocolError::TransportError(
269                "Signaling client is not connected.".to_string(),
270            ));
271        }
272
273        let criteria = route_candidates_request::NodeSelectionCriteria {
274            candidate_count: 1,
275            ranking_factors: Vec::new(),
276            minimal_dependency_requirement: None,
277            minimal_health_requirement: None,
278        };
279
280        let request = RouteCandidatesRequest {
281            target_type: target_type.clone(),
282            criteria: Some(criteria),
283            client_location: None,
284        };
285
286        let response = self
287            .signaling_client
288            .send_route_candidates_request(self.self_id.clone(), self.credential.clone(), request)
289            .await
290            .map_err(|e| {
291                ProtocolError::TransportError(format!("Route candidates request failed: {e}"))
292            })?;
293
294        match response.result {
295            Some(actr_protocol::route_candidates_response::Result::Success(ok)) => {
296                ok.candidates.into_iter().next().ok_or_else(|| {
297                    ProtocolError::TargetNotFound(format!(
298                        "No route candidates for type {}.{}",
299                        target_type.manufacturer, target_type.name
300                    ))
301                })
302            }
303            Some(actr_protocol::route_candidates_response::Result::Error(err)) => {
304                Err(ProtocolError::TransportError(format!(
305                    "Route candidates error {}: {}",
306                    err.code, err.message
307                )))
308            }
309            None => Err(ProtocolError::TransportError(
310                "Route candidates response missing result".to_string(),
311            )),
312        }
313    }
314
315    // ========== Fast Path: MediaTrack Methods ==========
316
317    async fn register_media_track<F>(&self, track_id: String, callback: F) -> ActorResult<()>
318    where
319        F: Fn(MediaSample, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static,
320    {
321        tracing::debug!(
322            "📹 Registering MediaTrack callback for track_id: {}",
323            track_id
324        );
325        self.media_frame_registry
326            .register(track_id, Arc::new(callback));
327        Ok(())
328    }
329
330    async fn unregister_media_track(&self, track_id: &str) -> ActorResult<()> {
331        tracing::debug!(
332            "📹 Unregistering MediaTrack callback for track_id: {}",
333            track_id
334        );
335        self.media_frame_registry.unregister(track_id);
336        Ok(())
337    }
338
339    async fn send_media_sample(
340        &self,
341        target: &Dest,
342        track_id: &str,
343        sample: MediaSample,
344    ) -> ActorResult<()> {
345        // 1. Select appropriate gate based on Dest
346        let gate = self.select_gate(target)?;
347
348        // 2. Extract target ActrId
349        let target_id = self.extract_target_id(target);
350
351        // 3. Send via OutGate (delegates to WebRTC Track)
352        gate.send_media_sample(target_id, track_id, sample).await
353    }
354}