actr_runtime/outbound/
outproc_out_gate.rs

1//! OutprocOutGate - Outproc transport adapter (outbound)
2//!
3//! # Responsibilities
4//! - Wrap OutprocTransportManager (Protobuf serialization)
5//! - Used for cross-process communication (WebRTC + WebSocket)
6//! - Maintain pending_requests (Request/Response matching)
7
8use crate::transport::{Dest, OutprocTransportManager};
9use actr_framework::{Bytes, MediaSample};
10use actr_protocol::prost::Message as ProstMessage;
11use actr_protocol::{ActorResult, ActrId, PayloadType, ProtocolError, RpcEnvelope};
12use std::collections::HashMap;
13use std::sync::Arc;
14use tokio::sync::{RwLock, oneshot};
15
16/// OutprocOutGate - Outproc transport adapter (outbound)
17///
18/// # Features
19/// - Protobuf serialization: serialize RpcEnvelope to byte stream
20/// - Defaults to PayloadType::RpcReliable for RPC messages
21/// - Maintain pending_requests for Request/Response matching
22/// - Support MediaTrack sending via WebRTC
23pub struct OutprocOutGate {
24    /// OutprocTransportManager instance
25    transport_manager: Arc<OutprocTransportManager>,
26
27    /// Pending requests: request_id → oneshot::Sender<Bytes>
28    /// Pending requests (can receive success or error)
29    pending_requests:
30        Arc<RwLock<HashMap<String, oneshot::Sender<actr_protocol::ActorResult<Bytes>>>>>,
31
32    /// WebRTC coordinator (optional, for MediaTrack support)
33    webrtc_coordinator: Option<Arc<crate::wire::webrtc::WebRtcCoordinator>>,
34}
35
36impl OutprocOutGate {
37    /// Create new OutprocOutGate
38    ///
39    /// # Arguments
40    /// - `transport_manager`: OutprocTransportManager instance
41    /// - `webrtc_coordinator`: Optional WebRTC coordinator for MediaTrack support
42    pub fn new(
43        transport_manager: Arc<OutprocTransportManager>,
44        webrtc_coordinator: Option<Arc<crate::wire::webrtc::WebRtcCoordinator>>,
45    ) -> Self {
46        Self {
47            transport_manager,
48            pending_requests: Arc::new(RwLock::new(HashMap::new())),
49            webrtc_coordinator,
50        }
51    }
52
53    /// Handle response message (called by MessageDispatcher)
54    ///
55    /// # Arguments
56    /// - `request_id`: Request ID
57    /// - `result`: Response data (Ok) or error (Err)
58    ///
59    /// # Returns
60    /// - `Ok(true)`: Successfully woke up waiting request
61    /// - `Ok(false)`: No corresponding pending request found
62    pub async fn handle_response(
63        &self,
64        request_id: &str,
65        result: actr_protocol::ActorResult<Bytes>,
66    ) -> ActorResult<bool> {
67        let mut pending = self.pending_requests.write().await;
68
69        if let Some(tx) = pending.remove(request_id) {
70            // Wake up waiting request with result (success or error)
71            let _ = tx.send(result);
72            tracing::debug!("✅ Completed request: {}", request_id);
73            Ok(true)
74        } else {
75            tracing::warn!("⚠️  No pending request for: {}", request_id);
76            Ok(false)
77        }
78    }
79
80    /// Get pending requests count (for monitoring)
81    pub async fn pending_count(&self) -> usize {
82        self.pending_requests.read().await.len()
83    }
84
85    /// Get pending_requests reference (for WebRtcGate to share)
86    pub fn get_pending_requests(
87        &self,
88    ) -> Arc<RwLock<HashMap<String, oneshot::Sender<actr_protocol::ActorResult<Bytes>>>>> {
89        self.pending_requests.clone()
90    }
91
92    /// Convert ActrId to Dest
93    fn actr_id_to_dest(actor_id: &ActrId) -> Dest {
94        Dest::actor(actor_id.clone())
95    }
96
97    /// Serialize RpcEnvelope to bytes
98    fn serialize_envelope(envelope: &RpcEnvelope) -> Vec<u8> {
99        envelope.encode_to_vec()
100    }
101}
102
103impl OutprocOutGate {
104    /// Send request and wait for response (bidirectional communication)
105    pub async fn send_request(&self, target: &ActrId, envelope: RpcEnvelope) -> ActorResult<Bytes> {
106        tracing::debug!(
107            "📤 OutprocGate::send_request to {:?}, request_id={}",
108            target,
109            envelope.request_id
110        );
111
112        // 1. Create oneshot channel for receiving response
113        let (response_tx, response_rx) = oneshot::channel();
114
115        // 2. Register pending request
116        {
117            let mut pending = self.pending_requests.write().await;
118            pending.insert(envelope.request_id.clone(), response_tx);
119        }
120
121        // 3. Serialize RpcEnvelope
122        let data = Self::serialize_envelope(&envelope);
123
124        // 4. Convert ActrId to Dest
125        let dest = Self::actr_id_to_dest(target);
126
127        // 5. Send message (defaults to PayloadType::RpcReliable)
128        match self
129            .transport_manager
130            .send(&dest, PayloadType::RpcReliable, &data)
131            .await
132        {
133            Ok(_) => {
134                tracing::debug!("✅ Sent request to {:?}", target);
135            }
136            Err(e) => {
137                // Send failed, remove pending request
138                self.pending_requests
139                    .write()
140                    .await
141                    .remove(&envelope.request_id);
142                return Err(ProtocolError::TransportError(e.to_string()));
143            }
144        }
145
146        // 6. Wait for response (timeout from envelope.timeout_ms)
147        let timeout = std::time::Duration::from_millis(envelope.timeout_ms as u64);
148
149        match tokio::time::timeout(timeout, response_rx).await {
150            Ok(Ok(result)) => {
151                // result is ActorResult<Bytes>, propagate it
152                tracing::debug!("✅ Received response for request: {}", envelope.request_id);
153                result
154            }
155            Ok(Err(_)) => {
156                // oneshot channel closed (shouldn't happen)
157                Err(ProtocolError::TransportError(
158                    "Response channel closed".to_string(),
159                ))
160            }
161            Err(_) => {
162                // Timeout
163                self.pending_requests
164                    .write()
165                    .await
166                    .remove(&envelope.request_id);
167                Err(ProtocolError::TransportError(format!(
168                    "Request timeout: {}ms",
169                    envelope.timeout_ms
170                )))
171            }
172        }
173    }
174
175    /// Send one-way message (no response expected)
176    pub async fn send_message(&self, target: &ActrId, envelope: RpcEnvelope) -> ActorResult<()> {
177        tracing::debug!("📤 OutprocGate::send_message to {:?}", target);
178
179        // 1. Serialize RpcEnvelope
180        let data = Self::serialize_envelope(&envelope);
181
182        // 2. Convert ActrId to Dest
183        let dest = Self::actr_id_to_dest(target);
184
185        // 3. Send message (defaults to PayloadType::RpcReliable)
186        self.transport_manager
187            .send(&dest, PayloadType::RpcReliable, &data)
188            .await
189            .map_err(|e| ProtocolError::TransportError(e.to_string()))?;
190
191        tracing::debug!("✅ Sent message to {:?}", target);
192        Ok(())
193    }
194
195    /// Send media sample via WebRTC native track
196    ///
197    /// # Parameters
198    /// - `target`: Target Actor ID
199    /// - `track_id`: Media track identifier
200    /// - `sample`: Media sample data
201    ///
202    /// # Implementation Note
203    /// Delegates to WebRtcCoordinator which manages WebRTC Tracks
204    pub async fn send_media_sample(
205        &self,
206        target: &ActrId,
207        track_id: &str,
208        sample: MediaSample,
209    ) -> ActorResult<()> {
210        tracing::debug!(
211            "📤 OutprocGate::send_media_sample to {:?}, track_id={}",
212            target,
213            track_id
214        );
215
216        // Check if WebRTC coordinator is available
217        let coordinator = self.webrtc_coordinator.as_ref().ok_or_else(|| {
218            ProtocolError::Actr(actr_protocol::ActrError::NotImplemented {
219                feature: "MediaTrack requires WebRTC coordinator".to_string(),
220            })
221        })?;
222
223        // Delegate to WebRtcCoordinator
224        coordinator
225            .send_media_sample(target, track_id, sample)
226            .await
227            .map_err(|e| ProtocolError::TransportError(format!("WebRTC send failed: {e}")))?;
228
229        tracing::debug!("✅ Sent media sample to {:?}", target);
230        Ok(())
231    }
232
233    /// Send DataStream (Fast Path)
234    ///
235    /// # Parameters
236    /// - `target`: Target Actor ID
237    /// - `payload_type`: PayloadType (StreamReliable or StreamLatencyFirst)
238    /// - `data`: Serialized DataStream bytes
239    ///
240    /// # Implementation Note
241    /// Sends via OutprocTransportManager using WebRTC DataChannel or WebSocket
242    pub async fn send_data_stream(
243        &self,
244        target: &ActrId,
245        payload_type: PayloadType,
246        data: Bytes,
247    ) -> ActorResult<()> {
248        tracing::debug!(
249            "📤 OutprocGate::send_data_stream to {:?}, payload_type={:?}, size={} bytes",
250            target,
251            payload_type,
252            data.len()
253        );
254
255        // Convert ActrId to Dest
256        let dest = Self::actr_id_to_dest(target);
257
258        // Send via transport manager
259        self.transport_manager
260            .send(&dest, payload_type, &data)
261            .await
262            .map_err(|e| ProtocolError::TransportError(e.to_string()))?;
263
264        tracing::debug!("✅ Sent DataStream to {:?}", target);
265        Ok(())
266    }
267}
268
269impl Drop for OutprocOutGate {
270    fn drop(&mut self) {
271        tracing::debug!("🗑️  OutprocGate dropped");
272    }
273}