actr_runtime/wire/webrtc/
gate.rs

1//! WebRtcGate - WebRTC-based OutboundGate implementation
2//!
3//! Uses WebRtcCoordinator to send/receive messages, implementing cross-process RPC communication
4
5use super::coordinator::WebRtcCoordinator;
6use crate::error::{RuntimeError, RuntimeResult};
7use crate::inbound::DataStreamRegistry;
8#[cfg(feature = "opentelemetry")]
9use crate::wire::webrtc::trace::set_parent_from_rpc_envelope;
10use actr_framework::Bytes;
11use actr_mailbox::{Mailbox, MessagePriority};
12use actr_protocol::prost::Message as ProstMessage;
13use actr_protocol::{self, ActrId, ActrIdExt, DataStream, PayloadType, RpcEnvelope};
14use std::collections::HashMap;
15use std::sync::Arc;
16use tokio::sync::{RwLock, oneshot};
17#[cfg(feature = "opentelemetry")]
18use tracing::Instrument as _;
19
20/// WebRTC Gate - OutboundGate implementation
21///
22/// # Responsibilities
23/// - Implement OutboundGate trait
24/// - Send messages using WebRtcCoordinator
25/// - Serialize/deserialize RpcEnvelope (Protobuf)
26/// - Track pending requests and match responses (by r  equest_id)
27/// - Route messages by PayloadType (RPC → Mailbox, DataStream → Registry)
28///
29/// # Design Principles
30/// - Response reuses Request's request_id (standard RPC semantics)
31/// - Use pending_requests to distinguish: exists = Response, doesn't exist = Request
32/// - Gateway layer doesn't deserialize payloads, raw bytes go directly to Mailbox
33/// - **IMPORTANT**: pending_requests should be shared with OutprocOutGate
34pub struct WebRtcGate {
35    /// Local Actor ID
36    local_id: Arc<RwLock<Option<ActrId>>>,
37
38    /// WebRTC signaling coordinator
39    coordinator: Arc<WebRtcCoordinator>,
40
41    /// Pending requests (request_id → (target_actor_id, response channel))
42    /// Used to determine if received message is Response (key exists) or Request (key doesn't exist)
43    /// **Shared with OutprocOutGate** to ensure correct Response routing
44    /// Can send success (Ok(Bytes)) or error (Err(ProtocolError))
45    pending_requests:
46        Arc<RwLock<HashMap<String, (ActrId, oneshot::Sender<actr_protocol::ActorResult<Bytes>>)>>>,
47
48    /// DataStream registry for fast-path message routing
49    data_stream_registry: Arc<DataStreamRegistry>,
50}
51
52impl WebRtcGate {
53    /// Create new WebRtcGate with shared pending_requests and DataStreamRegistry
54    ///
55    /// # Arguments
56    /// - `coordinator`: WebRtcCoordinator instance
57    /// - `pending_requests`: Shared pending requests (should be same as OutprocOutGate)
58    /// - `data_stream_registry`: DataStream registry for fast-path routing
59    pub fn new(
60        coordinator: Arc<WebRtcCoordinator>,
61        pending_requests: Arc<
62            RwLock<HashMap<String, (ActrId, oneshot::Sender<actr_protocol::ActorResult<Bytes>>)>>,
63        >,
64        data_stream_registry: Arc<DataStreamRegistry>,
65    ) -> Self {
66        Self {
67            local_id: Arc::new(RwLock::new(None)),
68            coordinator,
69            pending_requests,
70            data_stream_registry,
71        }
72    }
73
74    /// Set local Actor ID
75    pub async fn set_local_id(&self, actor_id: ActrId) {
76        *self.local_id.write().await = Some(actor_id);
77    }
78
79    /// Handle RpcEnvelope message (Response or Request)
80    ///
81    /// # Arguments
82    /// - `envelope`: Deserialized RpcEnvelope
83    /// - `from_bytes`: Sender's ActrId bytes (for Mailbox enqueue)
84    /// - `data`: Original message bytes (for Mailbox enqueue)
85    /// - `payload_type`: PayloadType to determine priority
86    /// - `pending_requests`: Shared pending requests map
87    /// - `mailbox`: Mailbox for enqueueing requests
88    ///
89    /// # Behavior
90    /// - If request_id exists in pending_requests: Response → wake up waiting caller
91    /// - If request_id doesn't exist: Request → enqueue to Mailbox
92    async fn handle_envelope(
93        envelope: RpcEnvelope,
94        from_bytes: Vec<u8>,
95        data: Bytes,
96        payload_type: PayloadType,
97        pending_requests: Arc<
98            RwLock<HashMap<String, (ActrId, oneshot::Sender<actr_protocol::ActorResult<Bytes>>)>>,
99        >,
100        mailbox: Arc<dyn Mailbox>,
101    ) {
102        // Extract and set tracing context from envelope
103        #[cfg(feature = "opentelemetry")]
104        {
105            use crate::wire::webrtc::trace::set_parent_from_rpc_envelope;
106            let span = tracing::info_span!("webrtc.receive_rpc", request_id = %envelope.request_id);
107            set_parent_from_rpc_envelope(&span, &envelope);
108            let _guard = span.enter();
109        }
110        let request_id = envelope.request_id.clone();
111
112        // Determine if Response or Request
113        let mut pending = pending_requests.write().await;
114        if let Some((target, response_tx)) = pending.remove(&request_id) {
115            // Response - Wake up waiting caller (bypassing disk, fast path)
116            drop(pending); // Release lock
117            tracing::debug!(
118                "📬 Received RPC Response: request_id={}, target={}",
119                request_id,
120                target.to_string_repr()
121            );
122
123            // Convert envelope to result
124            let result = match (envelope.payload, envelope.error) {
125                (Some(payload), None) => Ok(payload),
126                (None, Some(error)) => Err(actr_protocol::ProtocolError::TransportError(format!(
127                    "RPC error {}: {}",
128                    error.code, error.message
129                ))),
130                _ => Err(actr_protocol::ProtocolError::DecodeError(
131                    "Invalid RpcEnvelope: payload and error fields inconsistent".to_string(),
132                )),
133            };
134            let _ = response_tx.send(result);
135        } else {
136            // Request - Enqueue to Mailbox (pass raw bytes, zero overhead)
137            drop(pending); // Release lock
138            tracing::debug!("📥 Received RPC Request: request_id={}", request_id);
139
140            // Determine priority based on PayloadType
141            let priority = match payload_type {
142                PayloadType::RpcSignal => MessagePriority::High,
143                PayloadType::RpcReliable => MessagePriority::Normal,
144                _ => MessagePriority::Normal,
145            };
146
147            // Enqueue to Mailbox (from_bytes and data are original bytes, zero overhead)
148            // Convert Bytes to Vec<u8> (Mailbox uses Vec)
149            match mailbox.enqueue(from_bytes, data.to_vec(), priority).await {
150                Ok(msg_id) => {
151                    tracing::debug!(
152                        "✅ RPC message enqueued to Mailbox: msg_id={}, priority={:?}",
153                        msg_id,
154                        priority
155                    );
156                }
157                Err(e) => {
158                    tracing::error!("❌ Mailbox enqueue failed: {:?}", e);
159                }
160            }
161        }
162    }
163
164    /// Start message receive loop (called by ActrSystem/ActrNode)
165    ///
166    /// # Arguments
167    /// - `mailbox`: message queue for persisting inbound requests
168    ///
169    /// # Architecture
170    /// According to three-loop architecture design (framework-runtime-architecture.zh.md):
171    /// - WebRtcGate belongs to outer loop (Transport layer)
172    /// - Mailbox belongs to inner loop (state path)
173    /// - Message flow: WebRTC → WebRtcGate → Mailbox/DataStreamRegistry → Scheduler → ActrNode
174    ///
175    /// # Message Routing Logic
176    /// - Route based on PayloadType:
177    ///   - RpcReliable/RpcSignal: Deserialize RpcEnvelope, check pending_requests, enqueue to Mailbox
178    ///   - StreamReliable/StreamLatencyFirst: Deserialize DataStream, dispatch to DataStreamRegistry
179    pub async fn start_receive_loop(&self, mailbox: Arc<dyn Mailbox>) -> RuntimeResult<()> {
180        let coordinator = self.coordinator.clone();
181        let pending_requests = self.pending_requests.clone();
182        let data_stream_registry = self.data_stream_registry.clone();
183
184        tokio::spawn(async move {
185            loop {
186                // Receive message from WebRtcCoordinator (now includes PayloadType)
187                match coordinator.receive_message().await {
188                    Ok(Some((from_bytes, data, payload_type))) => {
189                        tracing::debug!(
190                            "📨 WebRtcGate received message: {} bytes, PayloadType: {:?}",
191                            data.len(),
192                            payload_type
193                        );
194
195                        // Route based on PayloadType
196                        match payload_type {
197                            PayloadType::RpcReliable | PayloadType::RpcSignal => {
198                                // RPC path: deserialize RpcEnvelope and route
199                                match RpcEnvelope::decode(&data[..]) {
200                                    Ok(envelope) => {
201                                        #[cfg(feature = "opentelemetry")]
202                                        let span = {
203                                            let span = tracing::info_span!("webrtc.receive_rpc");
204                                            set_parent_from_rpc_envelope(&span, &envelope);
205                                            span
206                                        };
207                                        let handle_envelope_fut = Self::handle_envelope(
208                                            envelope,
209                                            from_bytes,
210                                            data,
211                                            payload_type,
212                                            pending_requests.clone(),
213                                            mailbox.clone(),
214                                        );
215                                        #[cfg(feature = "opentelemetry")]
216                                        let handle_envelope_fut =
217                                            handle_envelope_fut.instrument(span);
218
219                                        handle_envelope_fut.await;
220                                    }
221                                    Err(e) => {
222                                        tracing::error!(
223                                            "❌ Failed to deserialize RpcEnvelope: {:?}",
224                                            e
225                                        );
226                                    }
227                                }
228                            }
229                            PayloadType::StreamReliable | PayloadType::StreamLatencyFirst => {
230                                // DataStream path: deserialize and dispatch to registry
231                                match DataStream::decode(&data[..]) {
232                                    Ok(chunk) => {
233                                        tracing::debug!(
234                                            "📦 Received DataStream: stream_id={}, seq={}, {} bytes",
235                                            chunk.stream_id,
236                                            chunk.sequence,
237                                            chunk.payload.len()
238                                        );
239
240                                        // Decode sender ActrId
241                                        match ActrId::decode(&from_bytes[..]) {
242                                            Ok(sender_id) => {
243                                                // Dispatch to DataStreamRegistry (async callback invocation)
244                                                data_stream_registry
245                                                    .dispatch(chunk, sender_id)
246                                                    .await;
247                                            }
248                                            Err(e) => {
249                                                tracing::error!(
250                                                    "❌ Failed to decode sender ActrId: {:?}",
251                                                    e
252                                                );
253                                            }
254                                        }
255                                    }
256                                    Err(e) => {
257                                        tracing::error!(
258                                            "❌ Failed to deserialize DataStream: {:?}",
259                                            e
260                                        );
261                                    }
262                                }
263                            }
264                            PayloadType::MediaRtp => {
265                                tracing::warn!(
266                                    "⚠️ MediaRtp received in WebRtcGate (should use RTCTrackRemote)"
267                                );
268                            }
269                        }
270                    }
271                    Ok(None) => {
272                        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
273                    }
274                    Err(e) => {
275                        tracing::error!("❌ Message receive failed: {:?}", e);
276                        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
277                    }
278                }
279            }
280        });
281
282        Ok(())
283    }
284
285    /// Send response (called by Mailbox handler loop)
286    ///
287    /// # Arguments
288    /// - `target`: response target ActrId (original request sender)
289    /// - `response_envelope`: response RpcEnvelope (**must reuse original request_id**)
290    ///
291    /// # Design Principle
292    /// - Response reuses Request's request_id (caller is responsible)
293    /// - Receiver matches to pending_requests by request_id and wakes up waiting caller
294    #[cfg_attr(feature = "opentelemetry", tracing::instrument(skip_all))]
295    pub async fn send_response(
296        &self,
297        target: &ActrId,
298        response_envelope: RpcEnvelope,
299    ) -> RuntimeResult<()> {
300        // Serialize RpcEnvelope (Protobuf)
301        let mut buf = Vec::new();
302        response_envelope
303            .encode(&mut buf)
304            .map_err(|e| RuntimeError::Other(anyhow::anyhow!("Failed to encode response: {e}")))?;
305
306        // Send
307        self.coordinator.send_message(target, &buf).await?;
308        tracing::debug!(
309            "📤 Sent response: request_id={}, {} bytes",
310            response_envelope.request_id,
311            buf.len()
312        );
313        Ok(())
314    }
315}