actr_runtime/wire/webrtc/
gate.rs

1//! WebRtcGate - WebRTC-based OutboundGate implementation
2//!
3//! Uses WebRtcCoordinator to send/receive messages, implementing cross-process RPC communication
4
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::{RwLock, oneshot};
8use tracing::instrument;
9
10use super::coordinator::WebRtcCoordinator;
11use crate::error::{RuntimeError, RuntimeResult};
12use crate::inbound::DataStreamRegistry;
13use actr_framework::Bytes;
14use actr_mailbox::{Mailbox, MessagePriority};
15use actr_protocol::prost::Message as ProstMessage;
16use actr_protocol::{
17    self, ActorResult, ActrId, ActrIdExt, DataStream, PayloadType, ProtocolError, RpcEnvelope,
18};
19
20/// WebRTC Gate - OutboundGate implementation
21///
22/// # Responsibilities
23/// - Implement OutboundGate trait
24/// - Send messages using WebRtcCoordinator
25/// - Serialize/deserialize RpcEnvelope (Protobuf)
26/// - Track pending requests and match responses (by request_id)
27/// - Route messages by PayloadType (RPC → Mailbox, DataStream → Registry)
28///
29/// # Design Principles
30/// - Response reuses Request's request_id (standard RPC semantics)
31/// - Use pending_requests to distinguish: exists = Response, doesn't exist = Request
32/// - Gateway layer doesn't deserialize payloads, raw bytes go directly to Mailbox
33/// - **IMPORTANT**: pending_requests should be shared with OutprocOutGate
34pub struct WebRtcGate {
35    /// Local Actor ID
36    local_id: Arc<RwLock<Option<ActrId>>>,
37
38    /// WebRTC signaling coordinator
39    coordinator: Arc<WebRtcCoordinator>,
40
41    /// Pending requests (request_id → response channel)
42    /// Used to determine if received message is Response (key exists) or Request (key doesn't exist)
43    /// **Shared with OutprocOutGate** to ensure correct Response routing
44    /// Can send success (Ok(Bytes)) or error (Err(ProtocolError))
45    pending_requests:
46        Arc<RwLock<HashMap<String, oneshot::Sender<actr_protocol::ActorResult<Bytes>>>>>,
47
48    /// DataStream registry for fast-path message routing
49    data_stream_registry: Arc<DataStreamRegistry>,
50}
51
52impl WebRtcGate {
53    /// Create new WebRtcGate with shared pending_requests and DataStreamRegistry
54    ///
55    /// # Arguments
56    /// - `coordinator`: WebRtcCoordinator instance
57    /// - `pending_requests`: Shared pending requests (should be same as OutprocOutGate)
58    /// - `data_stream_registry`: DataStream registry for fast-path routing
59    pub fn new(
60        coordinator: Arc<WebRtcCoordinator>,
61        pending_requests: Arc<
62            RwLock<HashMap<String, oneshot::Sender<actr_protocol::ActorResult<Bytes>>>>,
63        >,
64        data_stream_registry: Arc<DataStreamRegistry>,
65    ) -> Self {
66        Self {
67            local_id: Arc::new(RwLock::new(None)),
68            coordinator,
69            pending_requests,
70            data_stream_registry,
71        }
72    }
73
74    /// Set local Actor ID
75    pub async fn set_local_id(&self, actor_id: ActrId) {
76        *self.local_id.write().await = Some(actor_id);
77    }
78
79    /// Handle RpcEnvelope message (Response or Request)
80    ///
81    /// # Arguments
82    /// - `envelope`: Deserialized RpcEnvelope
83    /// - `from_bytes`: Sender's ActrId bytes (for Mailbox enqueue)
84    /// - `data`: Original message bytes (for Mailbox enqueue)
85    /// - `payload_type`: PayloadType to determine priority
86    /// - `pending_requests`: Shared pending requests map
87    /// - `mailbox`: Mailbox for enqueueing requests
88    ///
89    /// # Behavior
90    /// - If request_id exists in pending_requests: Response → wake up waiting caller
91    /// - If request_id doesn't exist: Request → enqueue to Mailbox
92    async fn handle_envelope(
93        envelope: RpcEnvelope,
94        from_bytes: Vec<u8>,
95        data: Bytes,
96        payload_type: PayloadType,
97        pending_requests: Arc<
98            RwLock<HashMap<String, oneshot::Sender<actr_protocol::ActorResult<Bytes>>>>,
99        >,
100        mailbox: Arc<dyn Mailbox>,
101    ) {
102        // Extract and set tracing context from envelope
103        #[cfg(feature = "opentelemetry")]
104        {
105            use crate::wire::webrtc::trace::set_parent_from_rpc_envelope;
106            let span = tracing::info_span!("webrtc.receive_rpc", request_id = %envelope.request_id);
107            set_parent_from_rpc_envelope(&span, &envelope);
108            let _guard = span.enter();
109        }
110        let request_id = envelope.request_id.clone();
111
112        // Determine if Response or Request
113        let mut pending = pending_requests.write().await;
114        if let Some(response_tx) = pending.remove(&request_id) {
115            // Response - Wake up waiting caller (bypassing disk, fast path)
116            drop(pending); // Release lock
117            tracing::debug!("📬 Received RPC Response: request_id={}", request_id);
118
119            // Convert envelope to result
120            let result = match (envelope.payload, envelope.error) {
121                (Some(payload), None) => Ok(payload),
122                (None, Some(error)) => Err(actr_protocol::ProtocolError::TransportError(format!(
123                    "RPC error {}: {}",
124                    error.code, error.message
125                ))),
126                _ => Err(actr_protocol::ProtocolError::DecodeError(
127                    "Invalid RpcEnvelope: payload and error fields inconsistent".to_string(),
128                )),
129            };
130            let _ = response_tx.send(result);
131        } else {
132            // Request - Enqueue to Mailbox (pass raw bytes, zero overhead)
133            drop(pending); // Release lock
134            tracing::debug!("📥 Received RPC Request: request_id={}", request_id);
135
136            // Determine priority based on PayloadType
137            let priority = match payload_type {
138                PayloadType::RpcSignal => MessagePriority::High,
139                PayloadType::RpcReliable => MessagePriority::Normal,
140                _ => MessagePriority::Normal,
141            };
142
143            // Enqueue to Mailbox (from_bytes and data are original bytes, zero overhead)
144            // Convert Bytes to Vec<u8> (Mailbox uses Vec)
145            match mailbox.enqueue(from_bytes, data.to_vec(), priority).await {
146                Ok(msg_id) => {
147                    tracing::debug!(
148                        "✅ RPC message enqueued to Mailbox: msg_id={}, priority={:?}",
149                        msg_id,
150                        priority
151                    );
152                }
153                Err(e) => {
154                    tracing::error!("❌ Mailbox enqueue failed: {:?}", e);
155                }
156            }
157        }
158    }
159
160    /// Start message receive loop (called by ActrSystem/ActrNode)
161    ///
162    /// # Arguments
163    /// - `mailbox`: message queue for persisting inbound requests
164    ///
165    /// # Architecture
166    /// According to three-loop architecture design (framework-runtime-architecture.zh.md):
167    /// - WebRtcGate belongs to outer loop (Transport layer)
168    /// - Mailbox belongs to inner loop (state path)
169    /// - Message flow: WebRTC → WebRtcGate → Mailbox/DataStreamRegistry → Scheduler → ActrNode
170    ///
171    /// # Message Routing Logic
172    /// - Route based on PayloadType:
173    ///   - RpcReliable/RpcSignal: Deserialize RpcEnvelope, check pending_requests, enqueue to Mailbox
174    ///   - StreamReliable/StreamLatencyFirst: Deserialize DataStream, dispatch to DataStreamRegistry
175    pub async fn start_receive_loop(&self, mailbox: Arc<dyn Mailbox>) -> RuntimeResult<()> {
176        let coordinator = self.coordinator.clone();
177        let pending_requests = self.pending_requests.clone();
178        let data_stream_registry = self.data_stream_registry.clone();
179
180        tokio::spawn(async move {
181            loop {
182                // Receive message from WebRtcCoordinator (now includes PayloadType)
183                match coordinator.receive_message().await {
184                    Ok(Some((from_bytes, data, payload_type))) => {
185                        tracing::debug!(
186                            "📨 WebRtcGate received message: {} bytes, PayloadType: {:?}",
187                            data.len(),
188                            payload_type
189                        );
190
191                        // Route based on PayloadType
192                        match payload_type {
193                            PayloadType::RpcReliable | PayloadType::RpcSignal => {
194                                // RPC path: deserialize RpcEnvelope and route
195                                match RpcEnvelope::decode(&data[..]) {
196                                    Ok(envelope) => {
197                                        let fut = Self::handle_envelope(
198                                            envelope,
199                                            from_bytes,
200                                            data,
201                                            payload_type,
202                                            pending_requests.clone(),
203                                            mailbox.clone(),
204                                        );
205                                        #[cfg(feature = "opentelemetry")]
206                                        {
207                                            use tracing::Instrument;
208                                            let span = tracing::info_span!("webrtc.receive_rpc");
209                                            // Note: envelope is already moved, so we can't set parent here
210                                            // The parent is set inside handle_envelope instead
211                                            fut.instrument(span).await;
212                                        }
213                                        #[cfg(not(feature = "opentelemetry"))]
214                                        {
215                                            fut.await;
216                                        }
217                                    }
218                                    Err(e) => {
219                                        tracing::error!(
220                                            "❌ Failed to deserialize RpcEnvelope: {:?}",
221                                            e
222                                        );
223                                    }
224                                }
225                            }
226                            PayloadType::StreamReliable | PayloadType::StreamLatencyFirst => {
227                                // DataStream path: deserialize and dispatch to registry
228                                match DataStream::decode(&data[..]) {
229                                    Ok(chunk) => {
230                                        tracing::debug!(
231                                            "📦 Received DataStream: stream_id={}, seq={}, {} bytes",
232                                            chunk.stream_id,
233                                            chunk.sequence,
234                                            chunk.payload.len()
235                                        );
236
237                                        // Decode sender ActrId
238                                        match ActrId::decode(&from_bytes[..]) {
239                                            Ok(sender_id) => {
240                                                // Dispatch to DataStreamRegistry (async callback invocation)
241                                                data_stream_registry
242                                                    .dispatch(chunk, sender_id)
243                                                    .await;
244                                            }
245                                            Err(e) => {
246                                                tracing::error!(
247                                                    "❌ Failed to decode sender ActrId: {:?}",
248                                                    e
249                                                );
250                                            }
251                                        }
252                                    }
253                                    Err(e) => {
254                                        tracing::error!(
255                                            "❌ Failed to deserialize DataStream: {:?}",
256                                            e
257                                        );
258                                    }
259                                }
260                            }
261                            PayloadType::MediaRtp => {
262                                tracing::warn!(
263                                    "⚠️ MediaRtp received in WebRtcGate (should use RTCTrackRemote)"
264                                );
265                            }
266                        }
267                    }
268                    Ok(None) => {
269                        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
270                    }
271                    Err(e) => {
272                        tracing::error!("❌ Message receive failed: {:?}", e);
273                        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
274                    }
275                }
276            }
277        });
278
279        Ok(())
280    }
281
282    /// Send response (called by Mailbox handler loop)
283    ///
284    /// # Arguments
285    /// - `target`: response target ActrId (original request sender)
286    /// - `response_envelope`: response RpcEnvelope (**must reuse original request_id**)
287    ///
288    /// # Design Principle
289    /// - Response reuses Request's request_id (caller is responsible)
290    /// - Receiver matches to pending_requests by request_id and wakes up waiting caller
291    #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
292    #[instrument(skip_all)]
293    pub async fn send_response(
294        &self,
295        target: &ActrId,
296        mut response_envelope: RpcEnvelope,
297    ) -> RuntimeResult<()> {
298        // Inject tracing context before serialization
299        #[cfg(feature = "opentelemetry")]
300        {
301            use crate::wire::webrtc::trace::inject_span_context_to_rpc;
302            inject_span_context_to_rpc(&tracing::Span::current(), &mut response_envelope);
303        }
304
305        // Serialize RpcEnvelope (Protobuf)
306        let mut buf = Vec::new();
307        response_envelope
308            .encode(&mut buf)
309            .map_err(|e| RuntimeError::Other(anyhow::anyhow!("Failed to encode response: {e}")))?;
310
311        // Send
312        self.coordinator.send_message(target, &buf).await?;
313        tracing::debug!(
314            "📤 Sent response: request_id={}, {} bytes",
315            response_envelope.request_id,
316            buf.len()
317        );
318        Ok(())
319    }
320}
321
322impl WebRtcGate {
323    /// Send request and wait for response (bidirectional communication)
324    ///
325    /// # Implementation Details
326    /// - RpcEnvelope's request_id uniquely identifies this RPC call
327    /// - Response will reuse the same request_id, receiver matches via pending_requests
328    /// - 30 second timeout protection
329    #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
330    #[instrument(skip_all, fields(target = %target.to_string_repr(), request_id = %envelope.request_id))]
331    pub async fn send_request(
332        &self,
333        target: &ActrId,
334        mut envelope: RpcEnvelope,
335    ) -> ActorResult<Bytes> {
336        let request_id = envelope.request_id.clone();
337        tracing::debug!(
338            "📤 WebRtcGate::send_request to {:?}, request_id={}",
339            target,
340            request_id
341        );
342
343        // Create response channel and register
344        let (response_tx, response_rx) = oneshot::channel();
345        {
346            let mut pending = self.pending_requests.write().await;
347            pending.insert(request_id.clone(), response_tx);
348        }
349
350        // Inject tracing context before serialization
351        #[cfg(feature = "opentelemetry")]
352        {
353            use crate::wire::webrtc::trace::inject_span_context_to_rpc;
354            inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
355        }
356
357        // Serialize RpcEnvelope (Protobuf)
358        let mut buf = Vec::new();
359        envelope
360            .encode(&mut buf)
361            .map_err(|e| ProtocolError::SerializationError(e.to_string()))?;
362
363        // Send message
364        self.coordinator
365            .send_message(target, &buf)
366            .await
367            .map_err(|e| ProtocolError::TransportError(e.to_string()))?;
368
369        tracing::debug!(
370            "📤 Sent request: request_id={}, {} bytes",
371            request_id,
372            buf.len()
373        );
374
375        // Wait for response (30 second timeout)
376        match tokio::time::timeout(std::time::Duration::from_secs(30), response_rx).await {
377            Ok(Ok(result)) => {
378                // result is ActorResult<Bytes>, propagate it
379                tracing::debug!("📬 Received response: request_id={}", request_id);
380                result
381            }
382            Ok(Err(_)) => {
383                // Cleanup pending request
384                self.pending_requests.write().await.remove(&request_id);
385                Err(ProtocolError::TransportError(
386                    "Response channel closed".to_string(),
387                ))
388            }
389            Err(_) => {
390                // Timeout, cleanup pending request
391                self.pending_requests.write().await.remove(&request_id);
392                Err(ProtocolError::TransportError("Request timeout".to_string()))
393            }
394        }
395    }
396
397    /// Send one-way message (no response expected)
398    ///
399    /// # Implementation Details
400    /// - Different from send_request only in that it doesn't register pending_requests and doesn't wait for response
401    /// - RpcEnvelope request_id still needs to be set (for logging and tracing)
402    #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
403    #[instrument(skip_all, fields(target = %target.to_string_repr(), request_id = %envelope.request_id))]
404    pub async fn send_message(
405        &self,
406        target: &ActrId,
407        mut envelope: RpcEnvelope,
408    ) -> ActorResult<()> {
409        let request_id = envelope.request_id.clone();
410        tracing::debug!(
411            "📤 WebRtcGate::send_message to {:?}, request_id={}",
412            target,
413            request_id
414        );
415
416        // Inject tracing context before serialization
417        #[cfg(feature = "opentelemetry")]
418        {
419            use crate::wire::webrtc::trace::inject_span_context_to_rpc;
420            inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
421        }
422
423        // Serialize RpcEnvelope (Protobuf)
424        let mut buf = Vec::new();
425        envelope
426            .encode(&mut buf)
427            .map_err(|e| ProtocolError::SerializationError(e.to_string()))?;
428
429        // Send message (don't register pending_requests)
430        self.coordinator
431            .send_message(target, &buf)
432            .await
433            .map_err(|e| ProtocolError::TransportError(e.to_string()))?;
434
435        tracing::debug!(
436            "📤 Sent one-way message: request_id={}, {} bytes",
437            request_id,
438            buf.len()
439        );
440        Ok(())
441    }
442}