actr_runtime/
context.rs

1//! Runtime Context Implementation
2//!
3//! Implements the Context trait defined in actr-framework.
4
5use crate::inbound::{DataStreamRegistry, MediaFrameRegistry};
6use crate::outbound::OutGate;
7use actr_framework::{Bytes, Context, DataStream, Dest, MediaSample};
8use actr_protocol::{ActorResult, ActrError, ActrId, ProtocolError, RpcEnvelope, RpcRequest};
9use async_trait::async_trait;
10use futures_util::future::BoxFuture;
11use std::sync::Arc;
12
13/// RuntimeContext - Runtime's implementation of Context trait
14///
15/// # 设计特性
16///
17/// - **零虚函数**:内部使用 OutGate enum dispatch(非 dyn)
18/// - **智能路由**:根据 Dest 自动选择 InprocOut 或 OutprocOut
19/// - **完整实现**:包含 call/tell 的完整逻辑(编码、发送、解码)
20/// - **类型安全**:泛型方法提供编译时类型检查
21///
22/// # 性能
23///
24/// - OutGate 是 enum,使用静态分发
25/// - 编译器可完全内联整个调用链
26/// - 零虚函数调用开销
27#[derive(Clone)]
28pub struct RuntimeContext {
29    self_id: ActrId,
30    caller_id: Option<ActrId>,
31    trace_id: String,
32    request_id: String,
33    inproc_gate: OutGate,                          // Shell/Local 调用 - 立即可用
34    outproc_gate: Option<OutGate>,                 // 远程 Actor 调用 - 延迟初始化
35    data_stream_registry: Arc<DataStreamRegistry>, // DataStream 回调注册表
36    media_frame_registry: Arc<MediaFrameRegistry>, // MediaTrack 回调注册表
37}
38
39impl RuntimeContext {
40    /// 创建新的 RuntimeContext
41    ///
42    /// # 参数
43    ///
44    /// - `self_id`: 当前 Actor 的 ID
45    /// - `caller_id`: 调用方 Actor ID(可选)
46    /// - `trace_id`: 分布式追踪 ID
47    /// - `request_id`: 当前请求唯一 ID
48    /// - `inproc_gate`: 进程内通信 gate(立即可用)
49    /// - `outproc_gate`: 跨进程通信 gate(可能为 None,等待 WebRTC 初始化)
50    /// - `data_stream_registry`: DataStream 回调注册表
51    /// - `media_frame_registry`: MediaTrack 回调注册表
52    #[allow(clippy::too_many_arguments)] // Internal API - all parameters are required
53    pub fn new(
54        self_id: ActrId,
55        caller_id: Option<ActrId>,
56        trace_id: String,
57        request_id: String,
58        inproc_gate: OutGate,
59        outproc_gate: Option<OutGate>,
60        data_stream_registry: Arc<DataStreamRegistry>,
61        media_frame_registry: Arc<MediaFrameRegistry>,
62    ) -> Self {
63        Self {
64            self_id,
65            caller_id,
66            trace_id,
67            request_id,
68            inproc_gate,
69            outproc_gate,
70            data_stream_registry,
71            media_frame_registry,
72        }
73    }
74
75    /// 根据 Dest 选择合适的 gate
76    ///
77    /// - Dest::Shell → inproc_gate(立即可用)
78    /// - Dest::Local → inproc_gate(立即可用)
79    /// - Dest::Actor(_) → outproc_gate(需要检查是否已初始化)
80    #[inline]
81    fn select_gate(&self, dest: &Dest) -> ActorResult<&OutGate> {
82        match dest {
83            Dest::Shell | Dest::Local => Ok(&self.inproc_gate),
84            Dest::Actor(_) => self.outproc_gate.as_ref().ok_or_else(|| {
85                ProtocolError::Actr(ActrError::GateNotInitialized {
86                    message: "OutprocOutGate not initialized yet (WebRTC setup in progress)"
87                        .to_string(),
88                })
89            }),
90        }
91    }
92
93    /// 从 Dest 提取目标 ActrId
94    ///
95    /// - Dest::Shell → self_id(Workload → App 反向调用)
96    /// - Dest::Local → self_id(调用本地 Workload)
97    /// - Dest::Actor(id) → id(远程调用)
98    #[inline]
99    fn extract_target_id<'a>(&'a self, dest: &'a Dest) -> &'a ActrId {
100        match dest {
101            Dest::Shell | Dest::Local => &self.self_id,
102            Dest::Actor(id) => id,
103        }
104    }
105}
106
107#[async_trait]
108impl Context for RuntimeContext {
109    // ========== 数据访问方法 ==========
110
111    fn self_id(&self) -> &ActrId {
112        &self.self_id
113    }
114
115    fn caller_id(&self) -> Option<&ActrId> {
116        self.caller_id.as_ref()
117    }
118
119    fn trace_id(&self) -> &str {
120        &self.trace_id
121    }
122
123    fn request_id(&self) -> &str {
124        &self.request_id
125    }
126
127    // ========== 通信能力方法 ==========
128
129    async fn call<R: RpcRequest>(&self, target: &Dest, request: R) -> ActorResult<R::Response> {
130        use actr_protocol::prost::Message as ProstMessage;
131
132        // 1. 编码请求为 protobuf bytes
133        let payload: Bytes = request.encode_to_vec().into();
134
135        // 2. 从 RpcRequest trait 获取 route_key(编译时确定)
136        let route_key = R::route_key().to_string();
137
138        // 3. 构造 RpcEnvelope(继承当前 Context 的追踪信息)
139        let envelope = RpcEnvelope {
140            route_key,
141            payload: Some(payload),
142            error: None,
143            trace_id: self.trace_id.clone(), // 继承 trace_id,保持调用链追踪
144            request_id: uuid::Uuid::new_v4().to_string(), // 生成新的 request_id
145            metadata: vec![],
146            timeout_ms: 30000, // 默认 30 秒超时
147        };
148
149        // 4. 根据 Dest 选择 gate 并提取目标 ActrId(Shell/Local 立即可用,Actor 需要检查)
150        let gate = self.select_gate(target)?;
151        let target_id = self.extract_target_id(target);
152
153        // 5. 通过 OutGate enum dispatch 发送(零虚函数调用!)
154        let response_bytes = gate.send_request(target_id, envelope).await?;
155
156        // 6. 解码响应(类型安全:R::Response)
157        R::Response::decode(&*response_bytes).map_err(|e| {
158            ProtocolError::Actr(ActrError::DecodeFailure {
159                message: format!(
160                    "Failed to decode {}: {}",
161                    std::any::type_name::<R::Response>(),
162                    e
163                ),
164            })
165        })
166    }
167
168    async fn tell<R: RpcRequest>(&self, target: &Dest, message: R) -> ActorResult<()> {
169        // 1. 编码消息
170        let payload: Bytes = message.encode_to_vec().into();
171
172        // 2. 获取 route_key
173        let route_key = R::route_key().to_string();
174
175        // 3. 构造 RpcEnvelope(fire-and-forget 语义)
176        let envelope = RpcEnvelope {
177            route_key,
178            payload: Some(payload),
179            error: None,
180            trace_id: self.trace_id.clone(),
181            request_id: uuid::Uuid::new_v4().to_string(),
182            metadata: vec![],
183            timeout_ms: 0, // 0 表示不等待响应
184        };
185
186        // 4. 根据 Dest 选择 gate 并提取目标 ActrId(Shell/Local 立即可用,Actor 需要检查)
187        let gate = self.select_gate(target)?;
188        let target_id = self.extract_target_id(target);
189
190        // 5. 通过 OutGate enum dispatch 发送
191        gate.send_message(target_id, envelope).await
192    }
193
194    // ========== Fast Path: DataStream Methods ==========
195
196    async fn register_stream<F>(&self, stream_id: String, callback: F) -> ActorResult<()>
197    where
198        F: Fn(DataStream, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static,
199    {
200        tracing::debug!(
201            "📊 Registering DataStream callback for stream_id: {}",
202            stream_id
203        );
204        self.data_stream_registry
205            .register(stream_id, Arc::new(callback));
206        Ok(())
207    }
208
209    async fn unregister_stream(&self, stream_id: &str) -> ActorResult<()> {
210        tracing::debug!(
211            "🚫 Unregistering DataStream callback for stream_id: {}",
212            stream_id
213        );
214        self.data_stream_registry.unregister(stream_id);
215        Ok(())
216    }
217
218    async fn send_data_stream(&self, target: &Dest, chunk: DataStream) -> ActorResult<()> {
219        use actr_protocol::prost::Message as ProstMessage;
220
221        // 1. Serialize DataStream to bytes
222        let payload = chunk.encode_to_vec();
223
224        tracing::debug!(
225            "📤 Sending DataStream: stream_id={}, sequence={}, size={} bytes",
226            chunk.stream_id,
227            chunk.sequence,
228            payload.len()
229        );
230
231        // 2. Select gate based on Dest
232        let gate = self.select_gate(target)?;
233        let target_id = self.extract_target_id(target);
234
235        // 3. Send via OutGate with appropriate PayloadType
236        // Use StreamReliable for reliable ordered transmission
237        // TODO: Allow user to choose between StreamReliable and StreamLatencyFirst
238        gate.send_data_stream(
239            target_id,
240            actr_protocol::PayloadType::StreamReliable,
241            bytes::Bytes::from(payload),
242        )
243        .await
244    }
245
246    // ========== Fast Path: MediaTrack Methods ==========
247
248    async fn register_media_track<F>(&self, track_id: String, callback: F) -> ActorResult<()>
249    where
250        F: Fn(MediaSample, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static,
251    {
252        tracing::debug!(
253            "📹 Registering MediaTrack callback for track_id: {}",
254            track_id
255        );
256        self.media_frame_registry
257            .register(track_id, Arc::new(callback));
258        Ok(())
259    }
260
261    async fn unregister_media_track(&self, track_id: &str) -> ActorResult<()> {
262        tracing::debug!(
263            "📹 Unregistering MediaTrack callback for track_id: {}",
264            track_id
265        );
266        self.media_frame_registry.unregister(track_id);
267        Ok(())
268    }
269
270    async fn send_media_sample(
271        &self,
272        target: &Dest,
273        track_id: &str,
274        sample: MediaSample,
275    ) -> ActorResult<()> {
276        // 1. Select appropriate gate based on Dest
277        let gate = self.select_gate(target)?;
278
279        // 2. Extract target ActrId
280        let target_id = self.extract_target_id(target);
281
282        // 3. Send via OutGate (delegates to WebRTC Track)
283        gate.send_media_sample(target_id, track_id, sample).await
284    }
285}