actr_runtime/wire/webrtc/gate.rs
1//! WebRtcGate - WebRTC-based OutboundGate implementation
2//!
3//! Uses WebRtcCoordinator to send/receive messages, implementing cross-process RPC communication
4
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::{RwLock, oneshot};
8
9use super::coordinator::WebRtcCoordinator;
10use crate::error::{RuntimeError, RuntimeResult};
11use crate::inbound::DataStreamRegistry;
12use actr_framework::Bytes;
13use actr_mailbox::{Mailbox, MessagePriority};
14use actr_protocol::prost::Message as ProstMessage;
15use actr_protocol::{
16 self, ActorResult, ActrId, DataStream, PayloadType, ProtocolError, RpcEnvelope,
17};
18
19/// WebRTC Gate - OutboundGate implementation
20///
21/// # Responsibilities
22/// - Implement OutboundGate trait
23/// - Send messages using WebRtcCoordinator
24/// - Serialize/deserialize RpcEnvelope (Protobuf)
25/// - Track pending requests and match responses (by request_id)
26/// - Route messages by PayloadType (RPC → Mailbox, DataStream → Registry)
27///
28/// # Design Principles
29/// - Response reuses Request's request_id (standard RPC semantics)
30/// - Use pending_requests to distinguish: exists = Response, doesn't exist = Request
31/// - Gateway layer doesn't deserialize payloads, raw bytes go directly to Mailbox
32/// - **IMPORTANT**: pending_requests should be shared with OutprocOutGate
33pub struct WebRtcGate {
34 /// Local Actor ID
35 local_id: Arc<RwLock<Option<ActrId>>>,
36
37 /// WebRTC signaling coordinator
38 coordinator: Arc<WebRtcCoordinator>,
39
40 /// Pending requests (request_id → response channel)
41 /// Used to determine if received message is Response (key exists) or Request (key doesn't exist)
42 /// **Shared with OutprocOutGate** to ensure correct Response routing
43 /// Can send success (Ok(Bytes)) or error (Err(ProtocolError))
44 pending_requests:
45 Arc<RwLock<HashMap<String, oneshot::Sender<actr_protocol::ActorResult<Bytes>>>>>,
46
47 /// DataStream registry for fast-path message routing
48 data_stream_registry: Arc<DataStreamRegistry>,
49}
50
51impl WebRtcGate {
52 /// Create new WebRtcGate with shared pending_requests and DataStreamRegistry
53 ///
54 /// # Arguments
55 /// - `coordinator`: WebRtcCoordinator instance
56 /// - `pending_requests`: Shared pending requests (should be same as OutprocOutGate)
57 /// - `data_stream_registry`: DataStream registry for fast-path routing
58 pub fn new(
59 coordinator: Arc<WebRtcCoordinator>,
60 pending_requests: Arc<
61 RwLock<HashMap<String, oneshot::Sender<actr_protocol::ActorResult<Bytes>>>>,
62 >,
63 data_stream_registry: Arc<DataStreamRegistry>,
64 ) -> Self {
65 Self {
66 local_id: Arc::new(RwLock::new(None)),
67 coordinator,
68 pending_requests,
69 data_stream_registry,
70 }
71 }
72
73 /// Set local Actor ID
74 pub async fn set_local_id(&self, actor_id: ActrId) {
75 *self.local_id.write().await = Some(actor_id);
76 }
77
78 /// Start message receive loop (called by ActrSystem/ActrNode)
79 ///
80 /// # Arguments
81 /// - `mailbox`: message queue for persisting inbound requests
82 ///
83 /// # Architecture
84 /// According to three-loop architecture design (framework-runtime-architecture.zh.md):
85 /// - WebRtcGate belongs to outer loop (Transport layer)
86 /// - Mailbox belongs to inner loop (state path)
87 /// - Message flow: WebRTC → WebRtcGate → Mailbox/DataStreamRegistry → Scheduler → ActrNode
88 ///
89 /// # Message Routing Logic
90 /// - Route based on PayloadType:
91 /// - RpcReliable/RpcSignal: Deserialize RpcEnvelope, check pending_requests, enqueue to Mailbox
92 /// - StreamReliable/StreamLatencyFirst: Deserialize DataStream, dispatch to DataStreamRegistry
93 pub async fn start_receive_loop(&self, mailbox: Arc<dyn Mailbox>) -> RuntimeResult<()> {
94 let coordinator = self.coordinator.clone();
95 let pending_requests = self.pending_requests.clone();
96 let data_stream_registry = self.data_stream_registry.clone();
97
98 tokio::spawn(async move {
99 loop {
100 // Receive message from WebRtcCoordinator (now includes PayloadType)
101 match coordinator.receive_message().await {
102 Ok(Some((from_bytes, data, payload_type))) => {
103 tracing::debug!(
104 "📨 WebRtcGate received message: {} bytes, PayloadType: {:?}",
105 data.len(),
106 payload_type
107 );
108
109 // Route based on PayloadType
110 match payload_type {
111 PayloadType::RpcReliable | PayloadType::RpcSignal => {
112 // RPC path: deserialize RpcEnvelope and route
113 match RpcEnvelope::decode(&data[..]) {
114 Ok(envelope) => {
115 let request_id = envelope.request_id.clone();
116
117 // Determine if Response or Request
118 let mut pending = pending_requests.write().await;
119 if let Some(response_tx) = pending.remove(&request_id) {
120 // Response - Wake up waiting caller (bypassing disk, fast path)
121 drop(pending); // Release lock
122 tracing::debug!(
123 "📬 Received RPC Response: request_id={}",
124 request_id
125 );
126
127 // Convert envelope to result
128 let result = match (envelope.payload, envelope.error) {
129 (Some(payload), None) => Ok(payload),
130 (None, Some(error)) => {
131 Err(actr_protocol::ProtocolError::TransportError(
132 format!(
133 "RPC error {}: {}",
134 error.code, error.message
135 ),
136 ))
137 }
138 _ => Err(actr_protocol::ProtocolError::DecodeError(
139 "Invalid RpcEnvelope: payload and error fields inconsistent"
140 .to_string(),
141 )),
142 };
143 let _ = response_tx.send(result);
144 } else {
145 // Request - Enqueue to Mailbox (pass raw bytes, zero overhead)
146 drop(pending); // Release lock
147 tracing::debug!(
148 "📥 Received RPC Request: request_id={}",
149 request_id
150 );
151
152 // Determine priority based on PayloadType
153 let priority = match payload_type {
154 PayloadType::RpcSignal => MessagePriority::High,
155 PayloadType::RpcReliable => MessagePriority::Normal,
156 _ => MessagePriority::Normal,
157 };
158
159 // Enqueue to Mailbox (from_bytes and data are original bytes, zero overhead)
160 // Convert Bytes to Vec<u8> (Mailbox uses Vec)
161 match mailbox
162 .enqueue(
163 from_bytes.clone(),
164 data.to_vec(),
165 priority,
166 )
167 .await
168 {
169 Ok(msg_id) => {
170 tracing::debug!(
171 "✅ RPC message enqueued to Mailbox: msg_id={}, priority={:?}",
172 msg_id,
173 priority
174 );
175 }
176 Err(e) => {
177 tracing::error!(
178 "❌ Mailbox enqueue failed: {:?}",
179 e
180 );
181 }
182 }
183 }
184 }
185 Err(e) => {
186 tracing::error!(
187 "❌ Failed to deserialize RpcEnvelope: {:?}",
188 e
189 );
190 }
191 }
192 }
193 PayloadType::StreamReliable | PayloadType::StreamLatencyFirst => {
194 // DataStream path: deserialize and dispatch to registry
195 match DataStream::decode(&data[..]) {
196 Ok(chunk) => {
197 tracing::debug!(
198 "📦 Received DataStream: stream_id={}, seq={}, {} bytes",
199 chunk.stream_id,
200 chunk.sequence,
201 chunk.payload.len()
202 );
203
204 // Decode sender ActrId
205 match ActrId::decode(&from_bytes[..]) {
206 Ok(sender_id) => {
207 // Dispatch to DataStreamRegistry (async callback invocation)
208 data_stream_registry
209 .dispatch(chunk, sender_id)
210 .await;
211 }
212 Err(e) => {
213 tracing::error!(
214 "❌ Failed to decode sender ActrId: {:?}",
215 e
216 );
217 }
218 }
219 }
220 Err(e) => {
221 tracing::error!(
222 "❌ Failed to deserialize DataStream: {:?}",
223 e
224 );
225 }
226 }
227 }
228 PayloadType::MediaRtp => {
229 tracing::warn!(
230 "⚠️ MediaRtp received in WebRtcGate (should use RTCTrackRemote)"
231 );
232 }
233 }
234 }
235 Ok(None) => {
236 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
237 }
238 Err(e) => {
239 tracing::error!("❌ Message receive failed: {:?}", e);
240 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
241 }
242 }
243 }
244 });
245
246 Ok(())
247 }
248
249 /// Send response (called by Mailbox handler loop)
250 ///
251 /// # Arguments
252 /// - `target`: response target ActrId (original request sender)
253 /// - `response_envelope`: response RpcEnvelope (**must reuse original request_id**)
254 ///
255 /// # Design Principle
256 /// - Response reuses Request's request_id (caller is responsible)
257 /// - Receiver matches to pending_requests by request_id and wakes up waiting caller
258 pub async fn send_response(
259 &self,
260 target: &ActrId,
261 response_envelope: RpcEnvelope,
262 ) -> RuntimeResult<()> {
263 // Serialize RpcEnvelope (Protobuf)
264 let mut buf = Vec::new();
265 response_envelope
266 .encode(&mut buf)
267 .map_err(|e| RuntimeError::Other(anyhow::anyhow!("Failed to encode response: {e}")))?;
268
269 // Send
270 self.coordinator.send_message(target, &buf).await?;
271 tracing::debug!(
272 "📤 Sent response: request_id={}, {} bytes",
273 response_envelope.request_id,
274 buf.len()
275 );
276 Ok(())
277 }
278}
279
280impl WebRtcGate {
281 /// Send request and wait for response (bidirectional communication)
282 ///
283 /// # Implementation Details
284 /// - RpcEnvelope's request_id uniquely identifies this RPC call
285 /// - Response will reuse the same request_id, receiver matches via pending_requests
286 /// - 30 second timeout protection
287 pub async fn send_request(&self, target: &ActrId, envelope: RpcEnvelope) -> ActorResult<Bytes> {
288 let request_id = envelope.request_id.clone();
289 tracing::debug!(
290 "📤 WebRtcGate::send_request to {:?}, request_id={}",
291 target,
292 request_id
293 );
294
295 // Create response channel and register
296 let (response_tx, response_rx) = oneshot::channel();
297 {
298 let mut pending = self.pending_requests.write().await;
299 pending.insert(request_id.clone(), response_tx);
300 }
301
302 // Serialize RpcEnvelope (Protobuf)
303 let mut buf = Vec::new();
304 envelope
305 .encode(&mut buf)
306 .map_err(|e| ProtocolError::SerializationError(e.to_string()))?;
307
308 // Send message
309 self.coordinator
310 .send_message(target, &buf)
311 .await
312 .map_err(|e| ProtocolError::TransportError(e.to_string()))?;
313
314 tracing::debug!(
315 "📤 Sent request: request_id={}, {} bytes",
316 request_id,
317 buf.len()
318 );
319
320 // Wait for response (30 second timeout)
321 match tokio::time::timeout(std::time::Duration::from_secs(30), response_rx).await {
322 Ok(Ok(result)) => {
323 // result is ActorResult<Bytes>, propagate it
324 tracing::debug!("📬 Received response: request_id={}", request_id);
325 result
326 }
327 Ok(Err(_)) => {
328 // Cleanup pending request
329 self.pending_requests.write().await.remove(&request_id);
330 Err(ProtocolError::TransportError(
331 "Response channel closed".to_string(),
332 ))
333 }
334 Err(_) => {
335 // Timeout, cleanup pending request
336 self.pending_requests.write().await.remove(&request_id);
337 Err(ProtocolError::TransportError("Request timeout".to_string()))
338 }
339 }
340 }
341
342 /// Send one-way message (no response expected)
343 ///
344 /// # Implementation Details
345 /// - Different from send_request only in that it doesn't register pending_requests and doesn't wait for response
346 /// - RpcEnvelope request_id still needs to be set (for logging and tracing)
347 pub async fn send_message(&self, target: &ActrId, envelope: RpcEnvelope) -> ActorResult<()> {
348 let request_id = envelope.request_id.clone();
349 tracing::debug!(
350 "📤 WebRtcGate::send_message to {:?}, request_id={}",
351 target,
352 request_id
353 );
354
355 // Serialize RpcEnvelope (Protobuf)
356 let mut buf = Vec::new();
357 envelope
358 .encode(&mut buf)
359 .map_err(|e| ProtocolError::SerializationError(e.to_string()))?;
360
361 // Send message (don't register pending_requests)
362 self.coordinator
363 .send_message(target, &buf)
364 .await
365 .map_err(|e| ProtocolError::TransportError(e.to_string()))?;
366
367 tracing::debug!(
368 "📤 Sent one-way message: request_id={}, {} bytes",
369 request_id,
370 buf.len()
371 );
372 Ok(())
373 }
374}