actr_runtime/outbound/
outproc_out_gate.rs

1//! OutprocOutGate - Outproc transport adapter (outbound)
2//!
3//! # Responsibilities
4//! - Wrap OutprocTransportManager (Protobuf serialization)
5//! - Used for cross-process communication (WebRTC + WebSocket)
6//! - Maintain pending_requests (Request/Response matching)
7//! - Block new requests to peers being cleaned up (closing_peers)
8
9use crate::transport::connection_event::{ConnectionEvent, ConnectionState};
10use crate::transport::{Dest, OutprocTransportManager};
11use actr_framework::{Bytes, MediaSample};
12use actr_protocol::prost::Message as ProstMessage;
13use actr_protocol::{ActorResult, ActrId, ActrIdExt, PayloadType, ProtocolError, RpcEnvelope};
14use std::collections::{HashMap, HashSet};
15use std::sync::Arc;
16use tokio::sync::{RwLock, broadcast, oneshot};
17
18/// OutprocOutGate - Outproc transport adapter (outbound)
19///
20/// # Features
21/// - Protobuf serialization: serialize RpcEnvelope to byte stream
22/// - Defaults to PayloadType::RpcReliable for RPC messages
23/// - Maintain pending_requests for Request/Response matching
24/// - Support MediaTrack sending via WebRTC
25/// - Block new requests to peers being cleaned up (closing_peers)
26pub struct OutprocOutGate {
27    /// OutprocTransportManager instance
28    transport_manager: Arc<OutprocTransportManager>,
29
30    /// Pending requests: request_id โ†’ (target_actor_id, oneshot::Sender<Bytes>)
31    /// Stores both the target ActorId and response sender for efficient cleanup by peer
32    pending_requests:
33        Arc<RwLock<HashMap<String, (ActrId, oneshot::Sender<actr_protocol::ActorResult<Bytes>>)>>>,
34
35    /// WebRTC coordinator (optional, for MediaTrack support)
36    webrtc_coordinator: Option<Arc<crate::wire::webrtc::WebRtcCoordinator>>,
37
38    #[allow(unused)]
39    /// todo: Peers currently being cleaned up (block new requests) ,closed requests will be cleaned up in event listener
40    closing_peers: Arc<RwLock<HashSet<ActrId>>>,
41}
42
43impl OutprocOutGate {
44    /// Create new OutprocOutGate
45    ///
46    /// # Arguments
47    /// - `transport_manager`: OutprocTransportManager instance
48    /// - `webrtc_coordinator`: Optional WebRTC coordinator for MediaTrack support
49    pub fn new(
50        transport_manager: Arc<OutprocTransportManager>,
51        webrtc_coordinator: Option<Arc<crate::wire::webrtc::WebRtcCoordinator>>,
52    ) -> Self {
53        let closing_peers = Arc::new(RwLock::new(HashSet::new()));
54        let pending_requests = Arc::new(RwLock::new(HashMap::new()));
55
56        // Start event listener if coordinator is available
57        // This is the ONLY event subscriber - it triggers top-down cleanup
58        if let Some(ref coordinator) = webrtc_coordinator {
59            Self::spawn_event_listener(
60                coordinator.subscribe_events(),
61                Arc::clone(&pending_requests),
62                Arc::clone(&closing_peers),
63                Arc::clone(&transport_manager),
64            );
65        }
66
67        Self {
68            transport_manager,
69            pending_requests,
70            webrtc_coordinator,
71            closing_peers,
72        }
73    }
74
75    /// Spawn event listener task to handle connection events
76    ///
77    /// This is the **ONLY** event subscriber in the cleanup chain.
78    /// It triggers top-down cleanup by calling transport_manager.close_transport().
79    fn spawn_event_listener(
80        mut event_rx: broadcast::Receiver<ConnectionEvent>,
81        pending_requests: Arc<
82            RwLock<HashMap<String, (ActrId, oneshot::Sender<actr_protocol::ActorResult<Bytes>>)>>,
83        >,
84        closing_peers: Arc<RwLock<HashSet<ActrId>>>,
85        transport_manager: Arc<OutprocTransportManager>,
86    ) {
87        tokio::spawn(async move {
88            while let Ok(event) = event_rx.recv().await {
89                tracing::debug!("๐Ÿ”„ OutprocOutGate received connection event: {:?}", event);
90                match &event {
91                    // Block new requests when connection enters Disconnected/Failed state
92                    ConnectionEvent::StateChanged {
93                        peer_id,
94                        state: ConnectionState::Disconnected | ConnectionState::Failed,
95                    } => {
96                        closing_peers.write().await.insert(peer_id.clone());
97                        tracing::debug!(
98                            "๐Ÿšซ Blocking new requests to peer {} (state: {:?})",
99                            peer_id.to_string_repr(),
100                            event
101                        );
102                    }
103
104                    // Clean pending requests and trigger downstream cleanup when connection is fully closed
105                    ConnectionEvent::StateChanged {
106                        peer_id,
107                        state: ConnectionState::Closed,
108                    }
109                    | ConnectionEvent::ConnectionClosed { peer_id } => {
110                        // Mark peer as closing (release lock immediately to avoid deadlock)
111                        {
112                            closing_peers.write().await.insert(peer_id.clone());
113                        } // Lock released here
114
115                        // 1. Trigger downstream cleanup (OutprocTransportManager โ†’ DestTransport โ†’ WirePool)
116                        // Note: We don't hold closing_peers lock here to avoid deadlock when
117                        // close_transport needs to acquire its own locks or when multiple
118                        // connections are closing simultaneously during shutdown.
119                        let dest = Dest::actor(peer_id.clone());
120                        match transport_manager.close_transport(&dest).await {
121                            Ok(_) => {
122                                tracing::info!(
123                                    "โœ… Successfully closed transport chain for peer {}",
124                                    peer_id.to_string_repr()
125                                );
126                            }
127                            Err(e) => {
128                                tracing::warn!(
129                                    "โš ๏ธ Failed to close transport for peer {}: {}",
130                                    peer_id.to_string_repr(),
131                                    e
132                                );
133                            }
134                        }
135
136                        // 2. Clean pending requests for this peer
137                        let mut pending = pending_requests.write().await;
138
139                        // Collect request_ids that belong to this peer
140                        let keys_to_remove: Vec<_> = pending
141                            .iter()
142                            .filter_map(|(req_id, (target, _))| {
143                                if target == peer_id {
144                                    Some(req_id.clone())
145                                } else {
146                                    None
147                                }
148                            })
149                            .collect();
150
151                        let cleaned_count = keys_to_remove.len();
152
153                        tracing::info!(
154                            "๐Ÿงน Cleaned {} pending requests for peer {}",
155                            cleaned_count,
156                            peer_id.to_string_repr()
157                        );
158
159                        // Remove and send error to all pending requests for this peer
160                        for key in keys_to_remove {
161                            if let Some((_, tx)) = pending.remove(&key) {
162                                let _ = tx.send(Err(ProtocolError::TransportError(
163                                    "Connection closed".to_string(),
164                                )));
165                            }
166                        }
167                        drop(pending); // Release lock before calling downstream
168
169                        // Unblock after cleanup completes
170                        closing_peers.write().await.remove(peer_id);
171                    }
172
173                    // Unblock peer when ICE restart succeeds
174                    ConnectionEvent::IceRestartCompleted {
175                        peer_id,
176                        success: true,
177                    } => {
178                        closing_peers.write().await.remove(peer_id);
179                        tracing::debug!(
180                            "โœ… Unblocked peer {} after successful ICE restart",
181                            peer_id.to_string_repr()
182                        );
183                    }
184
185                    _ => {} // Ignore other events
186                }
187            }
188        });
189    }
190
191    /// Handle response message (called by MessageDispatcher)
192    ///
193    /// # Arguments
194    /// - `request_id`: Request ID
195    /// - `result`: Response data (Ok) or error (Err)
196    ///
197    /// # Returns
198    /// - `Ok(true)`: Successfully woke up waiting request
199    /// - `Ok(false)`: No corresponding pending request found
200    pub async fn handle_response(
201        &self,
202        request_id: &str,
203        result: actr_protocol::ActorResult<Bytes>,
204    ) -> ActorResult<bool> {
205        let mut pending = self.pending_requests.write().await;
206
207        if let Some((target, tx)) = pending.remove(request_id) {
208            // Wake up waiting request with result (success or error)
209            let _ = tx.send(result);
210            tracing::debug!(
211                "โœ… Completed request: {} (target: {})",
212                request_id,
213                target.to_string_repr()
214            );
215            Ok(true)
216        } else {
217            tracing::warn!("โš ๏ธ  No pending request for: {}", request_id);
218            Ok(false)
219        }
220    }
221
222    /// Get pending requests count (for monitoring)
223    pub async fn pending_count(&self) -> usize {
224        self.pending_requests.read().await.len()
225    }
226
227    /// Get pending_requests reference (for WebRtcGate to share)
228    pub fn get_pending_requests(
229        &self,
230    ) -> Arc<RwLock<HashMap<String, (ActrId, oneshot::Sender<actr_protocol::ActorResult<Bytes>>)>>>
231    {
232        self.pending_requests.clone()
233    }
234
235    /// Convert ActrId to Dest
236    fn actr_id_to_dest(actor_id: &ActrId) -> Dest {
237        Dest::actor(actor_id.clone())
238    }
239
240    /// Serialize RpcEnvelope to bytes
241    fn serialize_envelope(envelope: &RpcEnvelope) -> Vec<u8> {
242        envelope.encode_to_vec()
243    }
244}
245
246impl OutprocOutGate {
247    /// Send request and wait for response (with specified PayloadType).
248    ///
249    /// This is primarily used by language bindings / non-generic RPC paths.
250    pub async fn send_request_with_type(
251        &self,
252        target: &ActrId,
253        payload_type: PayloadType,
254        envelope: RpcEnvelope,
255    ) -> ActorResult<Bytes> {
256        tracing::debug!(
257            "๐Ÿ“ค OutprocGate::send_request_with_type to {:?}, payload_type={:?}, request_id={}",
258            target,
259            payload_type,
260            envelope.request_id
261        );
262
263        // 1. Create oneshot channel for receiving response
264        let (response_tx, response_rx) = oneshot::channel();
265
266        // 2. Register pending request with target ActorId
267        {
268            let mut pending = self.pending_requests.write().await;
269            pending.insert(envelope.request_id.clone(), (target.clone(), response_tx));
270        }
271
272        // 3. Serialize RpcEnvelope
273        let data = Self::serialize_envelope(&envelope);
274
275        // 4. Convert ActrId to Dest
276        let dest = Self::actr_id_to_dest(target);
277
278        // 5. Send message using the specified payload_type
279        match self
280            .transport_manager
281            .send(&dest, payload_type, &data)
282            .await
283        {
284            Ok(_) => {
285                tracing::debug!("โœ… Sent request to {:?}", target);
286            }
287            Err(e) => {
288                // Send failed, remove pending request
289                self.pending_requests
290                    .write()
291                    .await
292                    .remove(&envelope.request_id);
293                return Err(ProtocolError::TransportError(e.to_string()));
294            }
295        }
296
297        // 6. Wait for response (timeout from envelope.timeout_ms)
298        let timeout = std::time::Duration::from_millis(envelope.timeout_ms as u64);
299
300        match tokio::time::timeout(timeout, response_rx).await {
301            Ok(Ok(result)) => {
302                // result is ActorResult<Bytes>, propagate it
303                tracing::debug!("โœ… Received response for request: {}", envelope.request_id);
304                result
305            }
306            Ok(Err(_)) => Err(ProtocolError::TransportError(
307                "Response channel closed".to_string(),
308            )),
309            Err(_) => {
310                // Timeout
311                self.pending_requests
312                    .write()
313                    .await
314                    .remove(&envelope.request_id);
315                Err(ProtocolError::TransportError(format!(
316                    "Request timeout: {}ms",
317                    envelope.timeout_ms
318                )))
319            }
320        }
321    }
322
323    /// Send request and wait for response (bidirectional communication)
324    #[cfg_attr(
325        feature = "opentelemetry",
326        tracing::instrument(skip_all, name = "OutprocOutGate.send_request")
327    )]
328    pub async fn send_request(&self, target: &ActrId, envelope: RpcEnvelope) -> ActorResult<Bytes> {
329        self.send_request_with_type(target, PayloadType::RpcReliable, envelope)
330            .await
331    }
332
333    /// Send one-way message (no response expected)
334    #[cfg_attr(
335        feature = "opentelemetry",
336        tracing::instrument(skip_all, name = "OutprocOutGate.send_message", fields(target = ?target.to_string_repr()))
337    )]
338    pub async fn send_message(&self, target: &ActrId, envelope: RpcEnvelope) -> ActorResult<()> {
339        tracing::debug!(
340            "๐Ÿ“ค OutprocGate::send_message to {:?}",
341            target.to_string_repr()
342        );
343
344        // // Check if target is being cleaned up
345        // if self.closing_peers.read().await.contains(target) {
346        //     return Err(ProtocolError::TransportError(format!(
347        //         "Connection to {} is closing",
348        //         target.to_string_repr()
349        //     )));
350        // }
351
352        self.send_message_with_type(target, PayloadType::RpcReliable, envelope)
353            .await
354    }
355
356    /// Send one-way message with specified PayloadType.
357    pub async fn send_message_with_type(
358        &self,
359        target: &ActrId,
360        payload_type: PayloadType,
361        envelope: RpcEnvelope,
362    ) -> ActorResult<()> {
363        tracing::debug!(
364            "๐Ÿ“ค OutprocGate::send_message_with_type to {:?}, payload_type={:?}",
365            target.to_string_repr(),
366            payload_type
367        );
368
369        let data = Self::serialize_envelope(&envelope);
370        let dest = Self::actr_id_to_dest(target);
371        self.transport_manager
372            .send(&dest, payload_type, &data)
373            .await
374            .map_err(|e| ProtocolError::TransportError(e.to_string()))?;
375        Ok(())
376    }
377
378    /// Send media sample via WebRTC native track
379    ///
380    /// # Parameters
381    /// - `target`: Target Actor ID
382    /// - `track_id`: Media track identifier
383    /// - `sample`: Media sample data
384    ///
385    /// # Implementation Note
386    /// Delegates to WebRtcCoordinator which manages WebRTC Tracks
387    pub async fn send_media_sample(
388        &self,
389        target: &ActrId,
390        track_id: &str,
391        sample: MediaSample,
392    ) -> ActorResult<()> {
393        tracing::debug!(
394            "๐Ÿ“ค OutprocGate::send_media_sample to {:?}, track_id={}",
395            target,
396            track_id
397        );
398
399        // Check if WebRTC coordinator is available
400        let coordinator = self.webrtc_coordinator.as_ref().ok_or_else(|| {
401            ProtocolError::Actr(actr_protocol::ActrError::NotImplemented {
402                feature: "MediaTrack requires WebRTC coordinator".to_string(),
403            })
404        })?;
405
406        // Delegate to WebRtcCoordinator
407        coordinator
408            .send_media_sample(target, track_id, sample)
409            .await
410            .map_err(|e| ProtocolError::TransportError(format!("WebRTC send failed: {e}")))?;
411
412        tracing::debug!("โœ… Sent media sample to {:?}", target);
413        Ok(())
414    }
415
416    /// Send DataStream (Fast Path)
417    ///
418    /// # Parameters
419    /// - `target`: Target Actor ID
420    /// - `payload_type`: PayloadType (StreamReliable or StreamLatencyFirst)
421    /// - `data`: Serialized DataStream bytes
422    ///
423    /// # Implementation Note
424    /// Sends via OutprocTransportManager using WebRTC DataChannel or WebSocket
425    pub async fn send_data_stream(
426        &self,
427        target: &ActrId,
428        payload_type: PayloadType,
429        data: Bytes,
430    ) -> ActorResult<()> {
431        tracing::debug!(
432            "๐Ÿ“ค OutprocGate::send_data_stream to {:?}, payload_type={:?}, size={} bytes",
433            target,
434            payload_type,
435            data.len()
436        );
437
438        // // Check if target is being cleaned up
439        // if self.closing_peers.read().await.contains(target) {
440        //     return Err(ProtocolError::TransportError(format!(
441        //         "Connection to {} is closing",
442        //         target.to_string_repr()
443        //     )));
444        // }
445
446        // Convert ActrId to Dest
447        let dest = Self::actr_id_to_dest(target);
448
449        // Send via transport manager
450        let result = self
451            .transport_manager
452            .send(&dest, payload_type, &data)
453            .await
454            .map_err(|e| ProtocolError::TransportError(e.to_string()));
455
456        result
457    }
458}
459
460impl Drop for OutprocOutGate {
461    fn drop(&mut self) {
462        tracing::debug!("๐Ÿ—‘๏ธ  OutprocGate dropped");
463    }
464}