actr_runtime/outbound/mod.rs
1//! Outbound Layer 2: Outbound gate abstraction layer
2//!
3//! 提供统一的出站消息发送接口,支持进程内和跨进程通信。
4//!
5//! # 设计特性
6//!
7//! - **Enum Dispatch**:使用枚举而非 trait object,实现零虚函数调用
8//! - **零成本抽象**:编译时确定类型,静态分发
9//! - **统一接口**:InprocOut 和 OutprocOut 共享相同的方法签名
10
11mod inproc_out_gate;
12mod outproc_out_gate;
13
14pub use inproc_out_gate::InprocOutGate;
15pub use outproc_out_gate::OutprocOutGate;
16
17use actr_framework::{Bytes, MediaSample};
18use actr_protocol::{ActorResult, ActrId, PayloadType, RpcEnvelope};
19use std::sync::Arc;
20
21/// OutGate - 出站消息门枚举
22///
23/// # 设计原则
24///
25/// - 使用 **enum dispatch** 而非 trait object,避免虚函数调用
26/// - **零成本抽象**:编译时准确确定类型
27/// - **完全独立**:仅用于出站(Outbound),不包含任何入站路由逻辑
28///
29/// # 性能
30///
31/// ```text
32/// OutGate::send_request() 内部:
33/// match self {
34/// OutGate::InprocOut(gate) => gate.send_request(...), // ← 静态分发
35/// OutGate::OutprocOut(gate) => gate.send_request(...), // ← 静态分发
36/// }
37///
38/// 性能:
39/// - 无虚函数表查找
40/// - 编译器完全内联
41/// - CPU 分支预测命中率 >95%
42/// ```
43#[derive(Clone)]
44pub enum OutGate {
45 /// InprocOut - 进程内传输(零序列化,出站)
46 InprocOut(Arc<InprocOutGate>),
47
48 /// OutprocOut - 跨进程传输(Protobuf 序列化,出站)
49 OutprocOut(Arc<OutprocOutGate>),
50}
51
52impl OutGate {
53 /// 发送请求并等待响应
54 ///
55 /// # 参数
56 ///
57 /// - `target`: 目标 Actor ID
58 /// - `envelope`: 消息信封(包含 route_key 和 payload)
59 ///
60 /// # 返回
61 ///
62 /// 返回响应的字节数据
63 ///
64 /// # 实现
65 ///
66 /// 使用 enum dispatch 静态分发到对应的实现:
67 /// - `InprocOut`: 零序列化,直接传递 RpcEnvelope
68 /// - `OutprocOut`: Protobuf 序列化,通过 Transport 层发送
69 #[cfg_attr(
70 feature = "opentelemetry",
71 tracing::instrument(skip_all, name = "OutGate.send_request")
72 )]
73 pub async fn send_request(&self, target: &ActrId, envelope: RpcEnvelope) -> ActorResult<Bytes> {
74 match self {
75 OutGate::InprocOut(gate) => gate.send_request(target, envelope).await,
76 OutGate::OutprocOut(gate) => gate.send_request(target, envelope).await,
77 }
78 }
79
80 /// 发送请求并等待响应(显式指定 PayloadType)
81 pub async fn send_request_with_type(
82 &self,
83 target_id: &ActrId,
84 payload_type: PayloadType,
85 envelope: RpcEnvelope,
86 ) -> ActorResult<Bytes> {
87 match self {
88 OutGate::InprocOut(gate) => {
89 gate.send_request_with_type(target_id, payload_type, None, envelope)
90 .await
91 }
92 OutGate::OutprocOut(gate) => {
93 gate.send_request_with_type(target_id, payload_type, envelope)
94 .await
95 }
96 }
97 }
98
99 /// 发送单向消息(不等待响应)
100 ///
101 /// # 参数
102 ///
103 /// - `target`: 目标 Actor ID
104 /// - `envelope`: 消息信封
105 ///
106 /// # 语义
107 ///
108 /// Fire-and-forget:发送后立即返回,不等待响应
109 #[cfg_attr(
110 feature = "opentelemetry",
111 tracing::instrument(skip_all, name = "OutGate.send_message")
112 )]
113 pub async fn send_message(&self, target: &ActrId, envelope: RpcEnvelope) -> ActorResult<()> {
114 match self {
115 OutGate::InprocOut(gate) => gate.send_message(target, envelope).await,
116 OutGate::OutprocOut(gate) => gate.send_message(target, envelope).await,
117 }
118 }
119
120 /// 发送单向消息(显式指定 PayloadType)
121 pub async fn send_message_with_type(
122 &self,
123 target: &ActrId,
124 payload_type: PayloadType,
125 envelope: RpcEnvelope,
126 ) -> ActorResult<()> {
127 match self {
128 OutGate::InprocOut(gate) => {
129 gate.send_message_with_type(target, payload_type, None, envelope)
130 .await
131 }
132 OutGate::OutprocOut(gate) => {
133 gate.send_message_with_type(target, payload_type, envelope)
134 .await
135 }
136 }
137 }
138
139 /// 发送媒体样本(WebRTC native media)
140 ///
141 /// # 参数
142 ///
143 /// - `target`: 目标 Actor ID
144 /// - `track_id`: Media track 标识符
145 /// - `sample`: 媒体样本数据
146 ///
147 /// # 语义
148 ///
149 /// - 仅支持 OutprocOut(WebRTC)
150 /// - InprocOut 返回 NotImplemented 错误
151 /// - 使用 WebRTC RTCRtpSender 发送,无 protobuf 开销
152 pub async fn send_media_sample(
153 &self,
154 target: &ActrId,
155 track_id: &str,
156 sample: MediaSample,
157 ) -> ActorResult<()> {
158 match self {
159 OutGate::InprocOut(_gate) => {
160 // InprocOut does not support MediaTrack (WebRTC-specific feature)
161 Err(actr_protocol::ProtocolError::Actr(
162 actr_protocol::ActrError::NotImplemented {
163 feature: "MediaTrack is only supported for remote actors via WebRTC"
164 .to_string(),
165 },
166 ))
167 }
168 OutGate::OutprocOut(gate) => gate.send_media_sample(target, track_id, sample).await,
169 }
170 }
171
172 /// 发送 DataStream(Fast Path 数据流)
173 ///
174 /// # 参数
175 ///
176 /// - `target`: 目标 Actor ID
177 /// - `payload_type`: PayloadType (StreamReliable 或 StreamLatencyFirst)
178 /// - `data`: 序列化后的 DataStream bytes
179 ///
180 /// # 语义
181 ///
182 /// - InprocOut: 通过 mpsc channel 发送
183 /// - OutprocOut: 通过 WebRTC DataChannel 或 WebSocket 发送
184 pub async fn send_data_stream(
185 &self,
186 target: &ActrId,
187 payload_type: actr_protocol::PayloadType,
188 data: Bytes,
189 ) -> ActorResult<()> {
190 match self {
191 OutGate::InprocOut(gate) => gate.send_data_stream(target, payload_type, data).await,
192 OutGate::OutprocOut(gate) => gate.send_data_stream(target, payload_type, data).await,
193 }
194 }
195}