actr_runtime/wire/webrtc/gate.rs
1//! WebRtcGate - WebRTC-based OutboundGate implementation
2//!
3//! Uses WebRtcCoordinator to send/receive messages, implementing cross-process RPC communication
4
5use super::coordinator::WebRtcCoordinator;
6use crate::error::{RuntimeError, RuntimeResult};
7use crate::inbound::DataStreamRegistry;
8#[cfg(feature = "opentelemetry")]
9use crate::wire::webrtc::trace::set_parent_from_rpc_envelope;
10use actr_framework::Bytes;
11use actr_mailbox::{Mailbox, MessagePriority};
12use actr_protocol::prost::Message as ProstMessage;
13use actr_protocol::{self, ActrId, ActrIdExt, DataStream, PayloadType, RpcEnvelope};
14use std::collections::HashMap;
15use std::sync::Arc;
16use tokio::sync::{RwLock, oneshot};
17#[cfg(feature = "opentelemetry")]
18use tracing::Instrument as _;
19
20/// WebRTC Gate - OutboundGate implementation
21///
22/// # Responsibilities
23/// - Implement OutboundGate trait
24/// - Send messages using WebRtcCoordinator
25/// - Serialize/deserialize RpcEnvelope (Protobuf)
26/// - Track pending requests and match responses (by r equest_id)
27/// - Route messages by PayloadType (RPC → Mailbox, DataStream → Registry)
28///
29/// # Design Principles
30/// - Response reuses Request's request_id (standard RPC semantics)
31/// - Use pending_requests to distinguish: exists = Response, doesn't exist = Request
32/// - Gateway layer doesn't deserialize payloads, raw bytes go directly to Mailbox
33/// - **IMPORTANT**: pending_requests should be shared with OutprocOutGate
34pub struct WebRtcGate {
35 /// Local Actor ID
36 local_id: Arc<RwLock<Option<ActrId>>>,
37
38 /// WebRTC signaling coordinator
39 coordinator: Arc<WebRtcCoordinator>,
40
41 /// Pending requests (request_id → (target_actor_id, response channel))
42 /// Used to determine if received message is Response (key exists) or Request (key doesn't exist)
43 /// **Shared with OutprocOutGate** to ensure correct Response routing
44 /// Can send success (Ok(Bytes)) or error (Err(ProtocolError))
45 pending_requests:
46 Arc<RwLock<HashMap<String, (ActrId, oneshot::Sender<actr_protocol::ActorResult<Bytes>>)>>>,
47
48 /// DataStream registry for fast-path message routing
49 data_stream_registry: Arc<DataStreamRegistry>,
50}
51
52impl WebRtcGate {
53 /// Create new WebRtcGate with shared pending_requests and DataStreamRegistry
54 ///
55 /// # Arguments
56 /// - `coordinator`: WebRtcCoordinator instance
57 /// - `pending_requests`: Shared pending requests (should be same as OutprocOutGate)
58 /// - `data_stream_registry`: DataStream registry for fast-path routing
59 pub fn new(
60 coordinator: Arc<WebRtcCoordinator>,
61 pending_requests: Arc<
62 RwLock<HashMap<String, (ActrId, oneshot::Sender<actr_protocol::ActorResult<Bytes>>)>>,
63 >,
64 data_stream_registry: Arc<DataStreamRegistry>,
65 ) -> Self {
66 Self {
67 local_id: Arc::new(RwLock::new(None)),
68 coordinator,
69 pending_requests,
70 data_stream_registry,
71 }
72 }
73
74 /// Set local Actor ID
75 pub async fn set_local_id(&self, actor_id: ActrId) {
76 *self.local_id.write().await = Some(actor_id);
77 }
78
79 /// Handle RpcEnvelope message (Response or Request)
80 ///
81 /// # Arguments
82 /// - `envelope`: Deserialized RpcEnvelope
83 /// - `from_bytes`: Sender's ActrId bytes (for Mailbox enqueue)
84 /// - `data`: Original message bytes (for Mailbox enqueue)
85 /// - `payload_type`: PayloadType to determine priority
86 /// - `pending_requests`: Shared pending requests map
87 /// - `mailbox`: Mailbox for enqueueing requests
88 ///
89 /// # Behavior
90 /// - If request_id exists in pending_requests: Response → wake up waiting caller
91 /// - If request_id doesn't exist: Request → enqueue to Mailbox
92 async fn handle_envelope(
93 envelope: RpcEnvelope,
94 from_bytes: Vec<u8>,
95 data: Bytes,
96 payload_type: PayloadType,
97 pending_requests: Arc<
98 RwLock<HashMap<String, (ActrId, oneshot::Sender<actr_protocol::ActorResult<Bytes>>)>>,
99 >,
100 mailbox: Arc<dyn Mailbox>,
101 ) {
102 // Extract and set tracing context from envelope
103 #[cfg(feature = "opentelemetry")]
104 {
105 use crate::wire::webrtc::trace::set_parent_from_rpc_envelope;
106 let span = tracing::info_span!("webrtc.receive_rpc", request_id = %envelope.request_id);
107 set_parent_from_rpc_envelope(&span, &envelope);
108 let _guard = span.enter();
109 }
110 let request_id = envelope.request_id.clone();
111
112 // Determine if Response or Request
113 let mut pending = pending_requests.write().await;
114 if let Some((target, response_tx)) = pending.remove(&request_id) {
115 // Response - Wake up waiting caller (bypassing disk, fast path)
116 drop(pending); // Release lock
117 tracing::debug!(
118 "📬 Received RPC Response: request_id={}, target={}",
119 request_id,
120 target.to_string_repr()
121 );
122
123 // Convert envelope to result
124 let result = match (envelope.payload, envelope.error) {
125 (Some(payload), None) => Ok(payload),
126 (None, Some(error)) => Err(actr_protocol::ProtocolError::TransportError(format!(
127 "RPC error {}: {}",
128 error.code, error.message
129 ))),
130 _ => Err(actr_protocol::ProtocolError::DecodeError(
131 "Invalid RpcEnvelope: payload and error fields inconsistent".to_string(),
132 )),
133 };
134 let _ = response_tx.send(result);
135 } else {
136 // Request - Enqueue to Mailbox (pass raw bytes, zero overhead)
137 drop(pending); // Release lock
138 tracing::debug!("📥 Received RPC Request: request_id={}", request_id);
139
140 // Determine priority based on PayloadType
141 let priority = match payload_type {
142 PayloadType::RpcSignal => MessagePriority::High,
143 PayloadType::RpcReliable => MessagePriority::Normal,
144 _ => MessagePriority::Normal,
145 };
146
147 // Enqueue to Mailbox (from_bytes and data are original bytes, zero overhead)
148 // Convert Bytes to Vec<u8> (Mailbox uses Vec)
149 match mailbox.enqueue(from_bytes, data.to_vec(), priority).await {
150 Ok(msg_id) => {
151 tracing::debug!(
152 "✅ RPC message enqueued to Mailbox: msg_id={}, priority={:?}",
153 msg_id,
154 priority
155 );
156 }
157 Err(e) => {
158 tracing::error!("❌ Mailbox enqueue failed: {:?}", e);
159 }
160 }
161 }
162 }
163
164 /// Start message receive loop (called by ActrSystem/ActrNode)
165 ///
166 /// # Arguments
167 /// - `mailbox`: message queue for persisting inbound requests
168 ///
169 /// # Architecture
170 /// According to three-loop architecture design (framework-runtime-architecture.zh.md):
171 /// - WebRtcGate belongs to outer loop (Transport layer)
172 /// - Mailbox belongs to inner loop (state path)
173 /// - Message flow: WebRTC → WebRtcGate → Mailbox/DataStreamRegistry → Scheduler → ActrNode
174 ///
175 /// # Message Routing Logic
176 /// - Route based on PayloadType:
177 /// - RpcReliable/RpcSignal: Deserialize RpcEnvelope, check pending_requests, enqueue to Mailbox
178 /// - StreamReliable/StreamLatencyFirst: Deserialize DataStream, dispatch to DataStreamRegistry
179 pub async fn start_receive_loop(&self, mailbox: Arc<dyn Mailbox>) -> RuntimeResult<()> {
180 let coordinator = self.coordinator.clone();
181 let pending_requests = self.pending_requests.clone();
182 let data_stream_registry = self.data_stream_registry.clone();
183
184 tokio::spawn(async move {
185 loop {
186 // Receive message from WebRtcCoordinator (now includes PayloadType)
187 match coordinator.receive_message().await {
188 Ok(Some((from_bytes, data, payload_type))) => {
189 tracing::debug!(
190 "📨 WebRtcGate received message: {} bytes, PayloadType: {:?}",
191 data.len(),
192 payload_type
193 );
194
195 // Route based on PayloadType
196 match payload_type {
197 PayloadType::RpcReliable | PayloadType::RpcSignal => {
198 // RPC path: deserialize RpcEnvelope and route
199 match RpcEnvelope::decode(&data[..]) {
200 Ok(envelope) => {
201 #[cfg(feature = "opentelemetry")]
202 let span = {
203 let span = tracing::info_span!("webrtc.receive_rpc");
204 set_parent_from_rpc_envelope(&span, &envelope);
205 span
206 };
207 let handle_envelope_fut = Self::handle_envelope(
208 envelope,
209 from_bytes,
210 data,
211 payload_type,
212 pending_requests.clone(),
213 mailbox.clone(),
214 );
215 #[cfg(feature = "opentelemetry")]
216 let handle_envelope_fut =
217 handle_envelope_fut.instrument(span);
218
219 handle_envelope_fut.await;
220 }
221 Err(e) => {
222 tracing::error!(
223 "❌ Failed to deserialize RpcEnvelope: {:?}",
224 e
225 );
226 }
227 }
228 }
229 PayloadType::StreamReliable | PayloadType::StreamLatencyFirst => {
230 // DataStream path: deserialize and dispatch to registry
231 match DataStream::decode(&data[..]) {
232 Ok(chunk) => {
233 tracing::debug!(
234 "📦 Received DataStream: stream_id={}, seq={}, {} bytes",
235 chunk.stream_id,
236 chunk.sequence,
237 chunk.payload.len()
238 );
239
240 // Decode sender ActrId
241 match ActrId::decode(&from_bytes[..]) {
242 Ok(sender_id) => {
243 // Dispatch to DataStreamRegistry (async callback invocation)
244 data_stream_registry
245 .dispatch(chunk, sender_id)
246 .await;
247 }
248 Err(e) => {
249 tracing::error!(
250 "❌ Failed to decode sender ActrId: {:?}",
251 e
252 );
253 }
254 }
255 }
256 Err(e) => {
257 tracing::error!(
258 "❌ Failed to deserialize DataStream: {:?}",
259 e
260 );
261 }
262 }
263 }
264 PayloadType::MediaRtp => {
265 tracing::warn!(
266 "⚠️ MediaRtp received in WebRtcGate (should use RTCTrackRemote)"
267 );
268 }
269 }
270 }
271 Ok(None) => {
272 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
273 }
274 Err(e) => {
275 tracing::error!("❌ Message receive failed: {:?}", e);
276 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
277 }
278 }
279 }
280 });
281
282 Ok(())
283 }
284
285 /// Send response (called by Mailbox handler loop)
286 ///
287 /// # Arguments
288 /// - `target`: response target ActrId (original request sender)
289 /// - `response_envelope`: response RpcEnvelope (**must reuse original request_id**)
290 ///
291 /// # Design Principle
292 /// - Response reuses Request's request_id (caller is responsible)
293 /// - Receiver matches to pending_requests by request_id and wakes up waiting caller
294 #[cfg_attr(feature = "opentelemetry", tracing::instrument(skip_all))]
295 pub async fn send_response(
296 &self,
297 target: &ActrId,
298 response_envelope: RpcEnvelope,
299 ) -> RuntimeResult<()> {
300 // Serialize RpcEnvelope (Protobuf)
301 let mut buf = Vec::new();
302 response_envelope
303 .encode(&mut buf)
304 .map_err(|e| RuntimeError::Other(anyhow::anyhow!("Failed to encode response: {e}")))?;
305
306 // Send
307 self.coordinator.send_message(target, &buf).await?;
308 tracing::debug!(
309 "📤 Sent response: request_id={}, {} bytes",
310 response_envelope.request_id,
311 buf.len()
312 );
313 Ok(())
314 }
315}