actr_runtime/outbound/outproc_out_gate.rs
1//! OutprocOutGate - Outproc transport adapter (outbound)
2//!
3//! # Responsibilities
4//! - Wrap OutprocTransportManager (Protobuf serialization)
5//! - Used for cross-process communication (WebRTC + WebSocket)
6//! - Maintain pending_requests (Request/Response matching)
7//! - Block new requests to peers being cleaned up (closing_peers)
8
9use crate::transport::connection_event::{ConnectionEvent, ConnectionState};
10use crate::transport::{Dest, OutprocTransportManager};
11use actr_framework::{Bytes, MediaSample};
12use actr_protocol::prost::Message as ProstMessage;
13use actr_protocol::{ActorResult, ActrId, ActrIdExt, PayloadType, ProtocolError, RpcEnvelope};
14use std::collections::{HashMap, HashSet};
15use std::sync::Arc;
16use tokio::sync::{RwLock, broadcast, oneshot};
17
18/// OutprocOutGate - Outproc transport adapter (outbound)
19///
20/// # Features
21/// - Protobuf serialization: serialize RpcEnvelope to byte stream
22/// - Defaults to PayloadType::RpcReliable for RPC messages
23/// - Maintain pending_requests for Request/Response matching
24/// - Support MediaTrack sending via WebRTC
25/// - Block new requests to peers being cleaned up (closing_peers)
26pub struct OutprocOutGate {
27 /// OutprocTransportManager instance
28 transport_manager: Arc<OutprocTransportManager>,
29
30 /// Pending requests: request_id โ (target_actor_id, oneshot::Sender<Bytes>)
31 /// Stores both the target ActorId and response sender for efficient cleanup by peer
32 pending_requests:
33 Arc<RwLock<HashMap<String, (ActrId, oneshot::Sender<actr_protocol::ActorResult<Bytes>>)>>>,
34
35 /// WebRTC coordinator (optional, for MediaTrack support)
36 webrtc_coordinator: Option<Arc<crate::wire::webrtc::WebRtcCoordinator>>,
37
38 #[allow(unused)]
39 /// todo: Peers currently being cleaned up (block new requests) ,closed requests will be cleaned up in event listener
40 closing_peers: Arc<RwLock<HashSet<ActrId>>>,
41}
42
43impl OutprocOutGate {
44 /// Create new OutprocOutGate
45 ///
46 /// # Arguments
47 /// - `transport_manager`: OutprocTransportManager instance
48 /// - `webrtc_coordinator`: Optional WebRTC coordinator for MediaTrack support
49 pub fn new(
50 transport_manager: Arc<OutprocTransportManager>,
51 webrtc_coordinator: Option<Arc<crate::wire::webrtc::WebRtcCoordinator>>,
52 ) -> Self {
53 let closing_peers = Arc::new(RwLock::new(HashSet::new()));
54 let pending_requests = Arc::new(RwLock::new(HashMap::new()));
55
56 // Start event listener if coordinator is available
57 // This is the ONLY event subscriber - it triggers top-down cleanup
58 if let Some(ref coordinator) = webrtc_coordinator {
59 Self::spawn_event_listener(
60 coordinator.subscribe_events(),
61 Arc::clone(&pending_requests),
62 Arc::clone(&closing_peers),
63 Arc::clone(&transport_manager),
64 );
65 }
66
67 Self {
68 transport_manager,
69 pending_requests,
70 webrtc_coordinator,
71 closing_peers,
72 }
73 }
74
75 /// Spawn event listener task to handle connection events
76 ///
77 /// This is the **ONLY** event subscriber in the cleanup chain.
78 /// It triggers top-down cleanup by calling transport_manager.close_transport().
79 fn spawn_event_listener(
80 mut event_rx: broadcast::Receiver<ConnectionEvent>,
81 pending_requests: Arc<
82 RwLock<HashMap<String, (ActrId, oneshot::Sender<actr_protocol::ActorResult<Bytes>>)>>,
83 >,
84 closing_peers: Arc<RwLock<HashSet<ActrId>>>,
85 transport_manager: Arc<OutprocTransportManager>,
86 ) {
87 tokio::spawn(async move {
88 while let Ok(event) = event_rx.recv().await {
89 tracing::debug!("๐ OutprocOutGate received connection event: {:?}", event);
90 match &event {
91 // Block new requests when connection enters Disconnected/Failed state
92 ConnectionEvent::StateChanged {
93 peer_id,
94 state: ConnectionState::Disconnected | ConnectionState::Failed,
95 } => {
96 closing_peers.write().await.insert(peer_id.clone());
97 tracing::debug!(
98 "๐ซ Blocking new requests to peer {} (state: {:?})",
99 peer_id.to_string_repr(),
100 event
101 );
102 }
103
104 // Clean pending requests and trigger downstream cleanup when connection is fully closed
105 ConnectionEvent::StateChanged {
106 peer_id,
107 state: ConnectionState::Closed,
108 }
109 | ConnectionEvent::ConnectionClosed { peer_id } => {
110 // Mark peer as closing (release lock immediately to avoid deadlock)
111 {
112 closing_peers.write().await.insert(peer_id.clone());
113 } // Lock released here
114
115 // 1. Trigger downstream cleanup (OutprocTransportManager โ DestTransport โ WirePool)
116 // Note: We don't hold closing_peers lock here to avoid deadlock when
117 // close_transport needs to acquire its own locks or when multiple
118 // connections are closing simultaneously during shutdown.
119 let dest = Dest::actor(peer_id.clone());
120 match transport_manager.close_transport(&dest).await {
121 Ok(_) => {
122 tracing::info!(
123 "โ
Successfully closed transport chain for peer {}",
124 peer_id.to_string_repr()
125 );
126 }
127 Err(e) => {
128 tracing::warn!(
129 "โ ๏ธ Failed to close transport for peer {}: {}",
130 peer_id.to_string_repr(),
131 e
132 );
133 }
134 }
135
136 // 2. Clean pending requests for this peer
137 let mut pending = pending_requests.write().await;
138
139 // Collect request_ids that belong to this peer
140 let keys_to_remove: Vec<_> = pending
141 .iter()
142 .filter_map(|(req_id, (target, _))| {
143 if target == peer_id {
144 Some(req_id.clone())
145 } else {
146 None
147 }
148 })
149 .collect();
150
151 let cleaned_count = keys_to_remove.len();
152
153 tracing::info!(
154 "๐งน Cleaned {} pending requests for peer {}",
155 cleaned_count,
156 peer_id.to_string_repr()
157 );
158
159 // Remove and send error to all pending requests for this peer
160 for key in keys_to_remove {
161 if let Some((_, tx)) = pending.remove(&key) {
162 let _ = tx.send(Err(ProtocolError::TransportError(
163 "Connection closed".to_string(),
164 )));
165 }
166 }
167 drop(pending); // Release lock before calling downstream
168
169 // Unblock after cleanup completes
170 closing_peers.write().await.remove(peer_id);
171 }
172
173 // Unblock peer when ICE restart succeeds
174 ConnectionEvent::IceRestartCompleted {
175 peer_id,
176 success: true,
177 } => {
178 closing_peers.write().await.remove(peer_id);
179 tracing::debug!(
180 "โ
Unblocked peer {} after successful ICE restart",
181 peer_id.to_string_repr()
182 );
183 }
184
185 _ => {} // Ignore other events
186 }
187 }
188 });
189 }
190
191 /// Handle response message (called by MessageDispatcher)
192 ///
193 /// # Arguments
194 /// - `request_id`: Request ID
195 /// - `result`: Response data (Ok) or error (Err)
196 ///
197 /// # Returns
198 /// - `Ok(true)`: Successfully woke up waiting request
199 /// - `Ok(false)`: No corresponding pending request found
200 pub async fn handle_response(
201 &self,
202 request_id: &str,
203 result: actr_protocol::ActorResult<Bytes>,
204 ) -> ActorResult<bool> {
205 let mut pending = self.pending_requests.write().await;
206
207 if let Some((target, tx)) = pending.remove(request_id) {
208 // Wake up waiting request with result (success or error)
209 let _ = tx.send(result);
210 tracing::debug!(
211 "โ
Completed request: {} (target: {})",
212 request_id,
213 target.to_string_repr()
214 );
215 Ok(true)
216 } else {
217 tracing::warn!("โ ๏ธ No pending request for: {}", request_id);
218 Ok(false)
219 }
220 }
221
222 /// Get pending requests count (for monitoring)
223 pub async fn pending_count(&self) -> usize {
224 self.pending_requests.read().await.len()
225 }
226
227 /// Get pending_requests reference (for WebRtcGate to share)
228 pub fn get_pending_requests(
229 &self,
230 ) -> Arc<RwLock<HashMap<String, (ActrId, oneshot::Sender<actr_protocol::ActorResult<Bytes>>)>>>
231 {
232 self.pending_requests.clone()
233 }
234
235 /// Convert ActrId to Dest
236 fn actr_id_to_dest(actor_id: &ActrId) -> Dest {
237 Dest::actor(actor_id.clone())
238 }
239
240 /// Serialize RpcEnvelope to bytes
241 fn serialize_envelope(envelope: &RpcEnvelope) -> Vec<u8> {
242 envelope.encode_to_vec()
243 }
244}
245
246impl OutprocOutGate {
247 /// Send request and wait for response (with specified PayloadType).
248 ///
249 /// This is primarily used by language bindings / non-generic RPC paths.
250 pub async fn send_request_with_type(
251 &self,
252 target: &ActrId,
253 payload_type: PayloadType,
254 envelope: RpcEnvelope,
255 ) -> ActorResult<Bytes> {
256 tracing::debug!(
257 "๐ค OutprocGate::send_request_with_type to {:?}, payload_type={:?}, request_id={}",
258 target,
259 payload_type,
260 envelope.request_id
261 );
262
263 // 1. Create oneshot channel for receiving response
264 let (response_tx, response_rx) = oneshot::channel();
265
266 // 2. Register pending request with target ActorId
267 {
268 let mut pending = self.pending_requests.write().await;
269 pending.insert(envelope.request_id.clone(), (target.clone(), response_tx));
270 }
271
272 // 3. Serialize RpcEnvelope
273 let data = Self::serialize_envelope(&envelope);
274
275 // 4. Convert ActrId to Dest
276 let dest = Self::actr_id_to_dest(target);
277
278 // 5. Send message using the specified payload_type
279 match self
280 .transport_manager
281 .send(&dest, payload_type, &data)
282 .await
283 {
284 Ok(_) => {
285 tracing::debug!("โ
Sent request to {:?}", target);
286 }
287 Err(e) => {
288 // Send failed, remove pending request
289 self.pending_requests
290 .write()
291 .await
292 .remove(&envelope.request_id);
293 return Err(ProtocolError::TransportError(e.to_string()));
294 }
295 }
296
297 // 6. Wait for response (timeout from envelope.timeout_ms)
298 let timeout = std::time::Duration::from_millis(envelope.timeout_ms as u64);
299
300 match tokio::time::timeout(timeout, response_rx).await {
301 Ok(Ok(result)) => {
302 // result is ActorResult<Bytes>, propagate it
303 tracing::debug!("โ
Received response for request: {}", envelope.request_id);
304 result
305 }
306 Ok(Err(_)) => Err(ProtocolError::TransportError(
307 "Response channel closed".to_string(),
308 )),
309 Err(_) => {
310 // Timeout
311 self.pending_requests
312 .write()
313 .await
314 .remove(&envelope.request_id);
315 Err(ProtocolError::TransportError(format!(
316 "Request timeout: {}ms",
317 envelope.timeout_ms
318 )))
319 }
320 }
321 }
322
323 /// Send request and wait for response (bidirectional communication)
324 #[cfg_attr(
325 feature = "opentelemetry",
326 tracing::instrument(skip_all, name = "OutprocOutGate.send_request")
327 )]
328 pub async fn send_request(&self, target: &ActrId, envelope: RpcEnvelope) -> ActorResult<Bytes> {
329 self.send_request_with_type(target, PayloadType::RpcReliable, envelope)
330 .await
331 }
332
333 /// Send one-way message (no response expected)
334 #[cfg_attr(
335 feature = "opentelemetry",
336 tracing::instrument(skip_all, name = "OutprocOutGate.send_message", fields(target = ?target.to_string_repr()))
337 )]
338 pub async fn send_message(&self, target: &ActrId, envelope: RpcEnvelope) -> ActorResult<()> {
339 tracing::debug!(
340 "๐ค OutprocGate::send_message to {:?}",
341 target.to_string_repr()
342 );
343
344 // // Check if target is being cleaned up
345 // if self.closing_peers.read().await.contains(target) {
346 // return Err(ProtocolError::TransportError(format!(
347 // "Connection to {} is closing",
348 // target.to_string_repr()
349 // )));
350 // }
351
352 self.send_message_with_type(target, PayloadType::RpcReliable, envelope)
353 .await
354 }
355
356 /// Send one-way message with specified PayloadType.
357 pub async fn send_message_with_type(
358 &self,
359 target: &ActrId,
360 payload_type: PayloadType,
361 envelope: RpcEnvelope,
362 ) -> ActorResult<()> {
363 tracing::debug!(
364 "๐ค OutprocGate::send_message_with_type to {:?}, payload_type={:?}",
365 target.to_string_repr(),
366 payload_type
367 );
368
369 let data = Self::serialize_envelope(&envelope);
370 let dest = Self::actr_id_to_dest(target);
371 self.transport_manager
372 .send(&dest, payload_type, &data)
373 .await
374 .map_err(|e| ProtocolError::TransportError(e.to_string()))?;
375 Ok(())
376 }
377
378 /// Send media sample via WebRTC native track
379 ///
380 /// # Parameters
381 /// - `target`: Target Actor ID
382 /// - `track_id`: Media track identifier
383 /// - `sample`: Media sample data
384 ///
385 /// # Implementation Note
386 /// Delegates to WebRtcCoordinator which manages WebRTC Tracks
387 pub async fn send_media_sample(
388 &self,
389 target: &ActrId,
390 track_id: &str,
391 sample: MediaSample,
392 ) -> ActorResult<()> {
393 tracing::debug!(
394 "๐ค OutprocGate::send_media_sample to {:?}, track_id={}",
395 target,
396 track_id
397 );
398
399 // Check if WebRTC coordinator is available
400 let coordinator = self.webrtc_coordinator.as_ref().ok_or_else(|| {
401 ProtocolError::Actr(actr_protocol::ActrError::NotImplemented {
402 feature: "MediaTrack requires WebRTC coordinator".to_string(),
403 })
404 })?;
405
406 // Delegate to WebRtcCoordinator
407 coordinator
408 .send_media_sample(target, track_id, sample)
409 .await
410 .map_err(|e| ProtocolError::TransportError(format!("WebRTC send failed: {e}")))?;
411
412 tracing::debug!("โ
Sent media sample to {:?}", target);
413 Ok(())
414 }
415
416 /// Send DataStream (Fast Path)
417 ///
418 /// # Parameters
419 /// - `target`: Target Actor ID
420 /// - `payload_type`: PayloadType (StreamReliable or StreamLatencyFirst)
421 /// - `data`: Serialized DataStream bytes
422 ///
423 /// # Implementation Note
424 /// Sends via OutprocTransportManager using WebRTC DataChannel or WebSocket
425 pub async fn send_data_stream(
426 &self,
427 target: &ActrId,
428 payload_type: PayloadType,
429 data: Bytes,
430 ) -> ActorResult<()> {
431 tracing::debug!(
432 "๐ค OutprocGate::send_data_stream to {:?}, payload_type={:?}, size={} bytes",
433 target,
434 payload_type,
435 data.len()
436 );
437
438 // // Check if target is being cleaned up
439 // if self.closing_peers.read().await.contains(target) {
440 // return Err(ProtocolError::TransportError(format!(
441 // "Connection to {} is closing",
442 // target.to_string_repr()
443 // )));
444 // }
445
446 // Convert ActrId to Dest
447 let dest = Self::actr_id_to_dest(target);
448
449 // Send via transport manager
450 let result = self
451 .transport_manager
452 .send(&dest, payload_type, &data)
453 .await
454 .map_err(|e| ProtocolError::TransportError(e.to_string()));
455
456 result
457 }
458}
459
460impl Drop for OutprocOutGate {
461 fn drop(&mut self) {
462 tracing::debug!("๐๏ธ OutprocGate dropped");
463 }
464}