actr_runtime/wire/webrtc/
gate.rs

1//! WebRtcGate - WebRTC-based OutboundGate implementation
2//!
3//! Uses WebRtcCoordinator to send/receive messages, implementing cross-process RPC communication
4
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::{RwLock, oneshot};
8
9use super::coordinator::WebRtcCoordinator;
10use crate::error::{RuntimeError, RuntimeResult};
11use crate::inbound::DataStreamRegistry;
12use actr_framework::Bytes;
13use actr_mailbox::{Mailbox, MessagePriority};
14use actr_protocol::prost::Message as ProstMessage;
15use actr_protocol::{
16    self, ActorResult, ActrId, DataStream, PayloadType, ProtocolError, RpcEnvelope,
17};
18
19/// WebRTC Gate - OutboundGate implementation
20///
21/// # Responsibilities
22/// - Implement OutboundGate trait
23/// - Send messages using WebRtcCoordinator
24/// - Serialize/deserialize RpcEnvelope (Protobuf)
25/// - Track pending requests and match responses (by request_id)
26/// - Route messages by PayloadType (RPC → Mailbox, DataStream → Registry)
27///
28/// # Design Principles
29/// - Response reuses Request's request_id (standard RPC semantics)
30/// - Use pending_requests to distinguish: exists = Response, doesn't exist = Request
31/// - Gateway layer doesn't deserialize payloads, raw bytes go directly to Mailbox
32/// - **IMPORTANT**: pending_requests should be shared with OutprocOutGate
33pub struct WebRtcGate {
34    /// Local Actor ID
35    local_id: Arc<RwLock<Option<ActrId>>>,
36
37    /// WebRTC signaling coordinator
38    coordinator: Arc<WebRtcCoordinator>,
39
40    /// Pending requests (request_id → response channel)
41    /// Used to determine if received message is Response (key exists) or Request (key doesn't exist)
42    /// **Shared with OutprocOutGate** to ensure correct Response routing
43    /// Can send success (Ok(Bytes)) or error (Err(ProtocolError))
44    pending_requests:
45        Arc<RwLock<HashMap<String, oneshot::Sender<actr_protocol::ActorResult<Bytes>>>>>,
46
47    /// DataStream registry for fast-path message routing
48    data_stream_registry: Arc<DataStreamRegistry>,
49}
50
51impl WebRtcGate {
52    /// Create new WebRtcGate with shared pending_requests and DataStreamRegistry
53    ///
54    /// # Arguments
55    /// - `coordinator`: WebRtcCoordinator instance
56    /// - `pending_requests`: Shared pending requests (should be same as OutprocOutGate)
57    /// - `data_stream_registry`: DataStream registry for fast-path routing
58    pub fn new(
59        coordinator: Arc<WebRtcCoordinator>,
60        pending_requests: Arc<
61            RwLock<HashMap<String, oneshot::Sender<actr_protocol::ActorResult<Bytes>>>>,
62        >,
63        data_stream_registry: Arc<DataStreamRegistry>,
64    ) -> Self {
65        Self {
66            local_id: Arc::new(RwLock::new(None)),
67            coordinator,
68            pending_requests,
69            data_stream_registry,
70        }
71    }
72
73    /// Set local Actor ID
74    pub async fn set_local_id(&self, actor_id: ActrId) {
75        *self.local_id.write().await = Some(actor_id);
76    }
77
78    /// Start message receive loop (called by ActrSystem/ActrNode)
79    ///
80    /// # Arguments
81    /// - `mailbox`: message queue for persisting inbound requests
82    ///
83    /// # Architecture
84    /// According to three-loop architecture design (framework-runtime-architecture.zh.md):
85    /// - WebRtcGate belongs to outer loop (Transport layer)
86    /// - Mailbox belongs to inner loop (state path)
87    /// - Message flow: WebRTC → WebRtcGate → Mailbox/DataStreamRegistry → Scheduler → ActrNode
88    ///
89    /// # Message Routing Logic
90    /// - Route based on PayloadType:
91    ///   - RpcReliable/RpcSignal: Deserialize RpcEnvelope, check pending_requests, enqueue to Mailbox
92    ///   - StreamReliable/StreamLatencyFirst: Deserialize DataStream, dispatch to DataStreamRegistry
93    pub async fn start_receive_loop(&self, mailbox: Arc<dyn Mailbox>) -> RuntimeResult<()> {
94        let coordinator = self.coordinator.clone();
95        let pending_requests = self.pending_requests.clone();
96        let data_stream_registry = self.data_stream_registry.clone();
97
98        tokio::spawn(async move {
99            loop {
100                // Receive message from WebRtcCoordinator (now includes PayloadType)
101                match coordinator.receive_message().await {
102                    Ok(Some((from_bytes, data, payload_type))) => {
103                        tracing::debug!(
104                            "📨 WebRtcGate received message: {} bytes, PayloadType: {:?}",
105                            data.len(),
106                            payload_type
107                        );
108
109                        // Route based on PayloadType
110                        match payload_type {
111                            PayloadType::RpcReliable | PayloadType::RpcSignal => {
112                                // RPC path: deserialize RpcEnvelope and route
113                                match RpcEnvelope::decode(&data[..]) {
114                                    Ok(envelope) => {
115                                        let request_id = envelope.request_id.clone();
116
117                                        // Determine if Response or Request
118                                        let mut pending = pending_requests.write().await;
119                                        if let Some(response_tx) = pending.remove(&request_id) {
120                                            // Response - Wake up waiting caller (bypassing disk, fast path)
121                                            drop(pending); // Release lock
122                                            tracing::debug!(
123                                                "📬 Received RPC Response: request_id={}",
124                                                request_id
125                                            );
126
127                                            // Convert envelope to result
128                                            let result = match (envelope.payload, envelope.error) {
129                                                (Some(payload), None) => Ok(payload),
130                                                (None, Some(error)) => {
131                                                    Err(actr_protocol::ProtocolError::TransportError(
132                                                        format!(
133                                                            "RPC error {}: {}",
134                                                            error.code, error.message
135                                                        ),
136                                                    ))
137                                                }
138                                                _ => Err(actr_protocol::ProtocolError::DecodeError(
139                                                    "Invalid RpcEnvelope: payload and error fields inconsistent"
140                                                        .to_string(),
141                                                )),
142                                            };
143                                            let _ = response_tx.send(result);
144                                        } else {
145                                            // Request - Enqueue to Mailbox (pass raw bytes, zero overhead)
146                                            drop(pending); // Release lock
147                                            tracing::debug!(
148                                                "📥 Received RPC Request: request_id={}",
149                                                request_id
150                                            );
151
152                                            // Determine priority based on PayloadType
153                                            let priority = match payload_type {
154                                                PayloadType::RpcSignal => MessagePriority::High,
155                                                PayloadType::RpcReliable => MessagePriority::Normal,
156                                                _ => MessagePriority::Normal,
157                                            };
158
159                                            // Enqueue to Mailbox (from_bytes and data are original bytes, zero overhead)
160                                            // Convert Bytes to Vec<u8> (Mailbox uses Vec)
161                                            match mailbox
162                                                .enqueue(
163                                                    from_bytes.clone(),
164                                                    data.to_vec(),
165                                                    priority,
166                                                )
167                                                .await
168                                            {
169                                                Ok(msg_id) => {
170                                                    tracing::debug!(
171                                                        "✅ RPC message enqueued to Mailbox: msg_id={}, priority={:?}",
172                                                        msg_id,
173                                                        priority
174                                                    );
175                                                }
176                                                Err(e) => {
177                                                    tracing::error!(
178                                                        "❌ Mailbox enqueue failed: {:?}",
179                                                        e
180                                                    );
181                                                }
182                                            }
183                                        }
184                                    }
185                                    Err(e) => {
186                                        tracing::error!(
187                                            "❌ Failed to deserialize RpcEnvelope: {:?}",
188                                            e
189                                        );
190                                    }
191                                }
192                            }
193                            PayloadType::StreamReliable | PayloadType::StreamLatencyFirst => {
194                                // DataStream path: deserialize and dispatch to registry
195                                match DataStream::decode(&data[..]) {
196                                    Ok(chunk) => {
197                                        tracing::debug!(
198                                            "📦 Received DataStream: stream_id={}, seq={}, {} bytes",
199                                            chunk.stream_id,
200                                            chunk.sequence,
201                                            chunk.payload.len()
202                                        );
203
204                                        // Decode sender ActrId
205                                        match ActrId::decode(&from_bytes[..]) {
206                                            Ok(sender_id) => {
207                                                // Dispatch to DataStreamRegistry (async callback invocation)
208                                                data_stream_registry
209                                                    .dispatch(chunk, sender_id)
210                                                    .await;
211                                            }
212                                            Err(e) => {
213                                                tracing::error!(
214                                                    "❌ Failed to decode sender ActrId: {:?}",
215                                                    e
216                                                );
217                                            }
218                                        }
219                                    }
220                                    Err(e) => {
221                                        tracing::error!(
222                                            "❌ Failed to deserialize DataStream: {:?}",
223                                            e
224                                        );
225                                    }
226                                }
227                            }
228                            PayloadType::MediaRtp => {
229                                tracing::warn!(
230                                    "⚠️ MediaRtp received in WebRtcGate (should use RTCTrackRemote)"
231                                );
232                            }
233                        }
234                    }
235                    Ok(None) => {
236                        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
237                    }
238                    Err(e) => {
239                        tracing::error!("❌ Message receive failed: {:?}", e);
240                        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
241                    }
242                }
243            }
244        });
245
246        Ok(())
247    }
248
249    /// Send response (called by Mailbox handler loop)
250    ///
251    /// # Arguments
252    /// - `target`: response target ActrId (original request sender)
253    /// - `response_envelope`: response RpcEnvelope (**must reuse original request_id**)
254    ///
255    /// # Design Principle
256    /// - Response reuses Request's request_id (caller is responsible)
257    /// - Receiver matches to pending_requests by request_id and wakes up waiting caller
258    pub async fn send_response(
259        &self,
260        target: &ActrId,
261        response_envelope: RpcEnvelope,
262    ) -> RuntimeResult<()> {
263        // Serialize RpcEnvelope (Protobuf)
264        let mut buf = Vec::new();
265        response_envelope
266            .encode(&mut buf)
267            .map_err(|e| RuntimeError::Other(anyhow::anyhow!("Failed to encode response: {e}")))?;
268
269        // Send
270        self.coordinator.send_message(target, &buf).await?;
271        tracing::debug!(
272            "📤 Sent response: request_id={}, {} bytes",
273            response_envelope.request_id,
274            buf.len()
275        );
276        Ok(())
277    }
278}
279
280impl WebRtcGate {
281    /// Send request and wait for response (bidirectional communication)
282    ///
283    /// # Implementation Details
284    /// - RpcEnvelope's request_id uniquely identifies this RPC call
285    /// - Response will reuse the same request_id, receiver matches via pending_requests
286    /// - 30 second timeout protection
287    pub async fn send_request(&self, target: &ActrId, envelope: RpcEnvelope) -> ActorResult<Bytes> {
288        let request_id = envelope.request_id.clone();
289        tracing::debug!(
290            "📤 WebRtcGate::send_request to {:?}, request_id={}",
291            target,
292            request_id
293        );
294
295        // Create response channel and register
296        let (response_tx, response_rx) = oneshot::channel();
297        {
298            let mut pending = self.pending_requests.write().await;
299            pending.insert(request_id.clone(), response_tx);
300        }
301
302        // Serialize RpcEnvelope (Protobuf)
303        let mut buf = Vec::new();
304        envelope
305            .encode(&mut buf)
306            .map_err(|e| ProtocolError::SerializationError(e.to_string()))?;
307
308        // Send message
309        self.coordinator
310            .send_message(target, &buf)
311            .await
312            .map_err(|e| ProtocolError::TransportError(e.to_string()))?;
313
314        tracing::debug!(
315            "📤 Sent request: request_id={}, {} bytes",
316            request_id,
317            buf.len()
318        );
319
320        // Wait for response (30 second timeout)
321        match tokio::time::timeout(std::time::Duration::from_secs(30), response_rx).await {
322            Ok(Ok(result)) => {
323                // result is ActorResult<Bytes>, propagate it
324                tracing::debug!("📬 Received response: request_id={}", request_id);
325                result
326            }
327            Ok(Err(_)) => {
328                // Cleanup pending request
329                self.pending_requests.write().await.remove(&request_id);
330                Err(ProtocolError::TransportError(
331                    "Response channel closed".to_string(),
332                ))
333            }
334            Err(_) => {
335                // Timeout, cleanup pending request
336                self.pending_requests.write().await.remove(&request_id);
337                Err(ProtocolError::TransportError("Request timeout".to_string()))
338            }
339        }
340    }
341
342    /// Send one-way message (no response expected)
343    ///
344    /// # Implementation Details
345    /// - Different from send_request only in that it doesn't register pending_requests and doesn't wait for response
346    /// - RpcEnvelope request_id still needs to be set (for logging and tracing)
347    pub async fn send_message(&self, target: &ActrId, envelope: RpcEnvelope) -> ActorResult<()> {
348        let request_id = envelope.request_id.clone();
349        tracing::debug!(
350            "📤 WebRtcGate::send_message to {:?}, request_id={}",
351            target,
352            request_id
353        );
354
355        // Serialize RpcEnvelope (Protobuf)
356        let mut buf = Vec::new();
357        envelope
358            .encode(&mut buf)
359            .map_err(|e| ProtocolError::SerializationError(e.to_string()))?;
360
361        // Send message (don't register pending_requests)
362        self.coordinator
363            .send_message(target, &buf)
364            .await
365            .map_err(|e| ProtocolError::TransportError(e.to_string()))?;
366
367        tracing::debug!(
368            "📤 Sent one-way message: request_id={}, {} bytes",
369            request_id,
370            buf.len()
371        );
372        Ok(())
373    }
374}