actr_runtime/outbound/
mod.rs

1//! Outbound Layer 2: Outbound gate abstraction layer
2//!
3//! 提供统一的出站消息发送接口,支持进程内和跨进程通信。
4//!
5//! # 设计特性
6//!
7//! - **Enum Dispatch**:使用枚举而非 trait object,实现零虚函数调用
8//! - **零成本抽象**:编译时确定类型,静态分发
9//! - **统一接口**:InprocOut 和 OutprocOut 共享相同的方法签名
10
11mod inproc_out_gate;
12mod outproc_out_gate;
13
14pub use inproc_out_gate::InprocOutGate;
15pub use outproc_out_gate::OutprocOutGate;
16
17use actr_framework::{Bytes, MediaSample};
18use actr_protocol::{ActorResult, ActrId, PayloadType, RpcEnvelope};
19use std::sync::Arc;
20
21/// OutGate - 出站消息门枚举
22///
23/// # 设计原则
24///
25/// - 使用 **enum dispatch** 而非 trait object,避免虚函数调用
26/// - **零成本抽象**:编译时准确确定类型
27/// - **完全独立**:仅用于出站(Outbound),不包含任何入站路由逻辑
28///
29/// # 性能
30///
31/// ```text
32/// OutGate::send_request() 内部:
33///   match self {
34///       OutGate::InprocOut(gate) => gate.send_request(...),   // ← 静态分发
35///       OutGate::OutprocOut(gate) => gate.send_request(...),  // ← 静态分发
36///   }
37///
38/// 性能:
39///   - 无虚函数表查找
40///   - 编译器完全内联
41///   - CPU 分支预测命中率 >95%
42/// ```
43#[derive(Clone)]
44pub enum OutGate {
45    /// InprocOut - 进程内传输(零序列化,出站)
46    InprocOut(Arc<InprocOutGate>),
47
48    /// OutprocOut - 跨进程传输(Protobuf 序列化,出站)
49    OutprocOut(Arc<OutprocOutGate>),
50}
51
52impl OutGate {
53    /// 发送请求并等待响应
54    ///
55    /// # 参数
56    ///
57    /// - `target`: 目标 Actor ID
58    /// - `envelope`: 消息信封(包含 route_key 和 payload)
59    ///
60    /// # 返回
61    ///
62    /// 返回响应的字节数据
63    ///
64    /// # 实现
65    ///
66    /// 使用 enum dispatch 静态分发到对应的实现:
67    /// - `InprocOut`: 零序列化,直接传递 RpcEnvelope
68    /// - `OutprocOut`: Protobuf 序列化,通过 Transport 层发送
69    #[cfg_attr(
70        feature = "opentelemetry",
71        tracing::instrument(skip_all, name = "OutGate.send_request")
72    )]
73    pub async fn send_request(&self, target: &ActrId, envelope: RpcEnvelope) -> ActorResult<Bytes> {
74        match self {
75            OutGate::InprocOut(gate) => gate.send_request(target, envelope).await,
76            OutGate::OutprocOut(gate) => gate.send_request(target, envelope).await,
77        }
78    }
79
80    /// 发送请求并等待响应(显式指定 PayloadType)
81    pub async fn send_request_with_type(
82        &self,
83        target_id: &ActrId,
84        payload_type: PayloadType,
85        envelope: RpcEnvelope,
86    ) -> ActorResult<Bytes> {
87        match self {
88            OutGate::InprocOut(gate) => {
89                gate.send_request_with_type(target_id, payload_type, None, envelope)
90                    .await
91            }
92            OutGate::OutprocOut(gate) => {
93                gate.send_request_with_type(target_id, payload_type, envelope)
94                    .await
95            }
96        }
97    }
98
99    /// 发送单向消息(不等待响应)
100    ///
101    /// # 参数
102    ///
103    /// - `target`: 目标 Actor ID
104    /// - `envelope`: 消息信封
105    ///
106    /// # 语义
107    ///
108    /// Fire-and-forget:发送后立即返回,不等待响应
109    #[cfg_attr(
110        feature = "opentelemetry",
111        tracing::instrument(skip_all, name = "OutGate.send_message")
112    )]
113    pub async fn send_message(&self, target: &ActrId, envelope: RpcEnvelope) -> ActorResult<()> {
114        match self {
115            OutGate::InprocOut(gate) => gate.send_message(target, envelope).await,
116            OutGate::OutprocOut(gate) => gate.send_message(target, envelope).await,
117        }
118    }
119
120    /// 发送单向消息(显式指定 PayloadType)
121    pub async fn send_message_with_type(
122        &self,
123        target: &ActrId,
124        payload_type: PayloadType,
125        envelope: RpcEnvelope,
126    ) -> ActorResult<()> {
127        match self {
128            OutGate::InprocOut(gate) => {
129                gate.send_message_with_type(target, payload_type, None, envelope)
130                    .await
131            }
132            OutGate::OutprocOut(gate) => {
133                gate.send_message_with_type(target, payload_type, envelope)
134                    .await
135            }
136        }
137    }
138
139    /// 发送媒体样本(WebRTC native media)
140    ///
141    /// # 参数
142    ///
143    /// - `target`: 目标 Actor ID
144    /// - `track_id`: Media track 标识符
145    /// - `sample`: 媒体样本数据
146    ///
147    /// # 语义
148    ///
149    /// - 仅支持 OutprocOut(WebRTC)
150    /// - InprocOut 返回 NotImplemented 错误
151    /// - 使用 WebRTC RTCRtpSender 发送,无 protobuf 开销
152    pub async fn send_media_sample(
153        &self,
154        target: &ActrId,
155        track_id: &str,
156        sample: MediaSample,
157    ) -> ActorResult<()> {
158        match self {
159            OutGate::InprocOut(_gate) => {
160                // InprocOut does not support MediaTrack (WebRTC-specific feature)
161                Err(actr_protocol::ProtocolError::Actr(
162                    actr_protocol::ActrError::NotImplemented {
163                        feature: "MediaTrack is only supported for remote actors via WebRTC"
164                            .to_string(),
165                    },
166                ))
167            }
168            OutGate::OutprocOut(gate) => gate.send_media_sample(target, track_id, sample).await,
169        }
170    }
171
172    /// 发送 DataStream(Fast Path 数据流)
173    ///
174    /// # 参数
175    ///
176    /// - `target`: 目标 Actor ID
177    /// - `payload_type`: PayloadType (StreamReliable 或 StreamLatencyFirst)
178    /// - `data`: 序列化后的 DataStream bytes
179    ///
180    /// # 语义
181    ///
182    /// - InprocOut: 通过 mpsc channel 发送
183    /// - OutprocOut: 通过 WebRTC DataChannel 或 WebSocket 发送
184    pub async fn send_data_stream(
185        &self,
186        target: &ActrId,
187        payload_type: actr_protocol::PayloadType,
188        data: Bytes,
189    ) -> ActorResult<()> {
190        match self {
191            OutGate::InprocOut(gate) => gate.send_data_stream(target, payload_type, data).await,
192            OutGate::OutprocOut(gate) => gate.send_data_stream(target, payload_type, data).await,
193        }
194    }
195}