actr_runtime/outbound/
outproc_out_gate.rs1use crate::transport::{Dest, OutprocTransportManager};
9use actr_framework::{Bytes, MediaSample};
10use actr_protocol::prost::Message as ProstMessage;
11use actr_protocol::{ActorResult, ActrId, PayloadType, ProtocolError, RpcEnvelope};
12use std::collections::HashMap;
13use std::sync::Arc;
14use tokio::sync::{RwLock, oneshot};
15
16pub struct OutprocOutGate {
24 transport_manager: Arc<OutprocTransportManager>,
26
27 pending_requests:
30 Arc<RwLock<HashMap<String, oneshot::Sender<actr_protocol::ActorResult<Bytes>>>>>,
31
32 webrtc_coordinator: Option<Arc<crate::wire::webrtc::WebRtcCoordinator>>,
34}
35
36impl OutprocOutGate {
37 pub fn new(
43 transport_manager: Arc<OutprocTransportManager>,
44 webrtc_coordinator: Option<Arc<crate::wire::webrtc::WebRtcCoordinator>>,
45 ) -> Self {
46 Self {
47 transport_manager,
48 pending_requests: Arc::new(RwLock::new(HashMap::new())),
49 webrtc_coordinator,
50 }
51 }
52
53 pub async fn handle_response(
63 &self,
64 request_id: &str,
65 result: actr_protocol::ActorResult<Bytes>,
66 ) -> ActorResult<bool> {
67 let mut pending = self.pending_requests.write().await;
68
69 if let Some(tx) = pending.remove(request_id) {
70 let _ = tx.send(result);
72 tracing::debug!("✅ Completed request: {}", request_id);
73 Ok(true)
74 } else {
75 tracing::warn!("⚠️ No pending request for: {}", request_id);
76 Ok(false)
77 }
78 }
79
80 pub async fn pending_count(&self) -> usize {
82 self.pending_requests.read().await.len()
83 }
84
85 pub fn get_pending_requests(
87 &self,
88 ) -> Arc<RwLock<HashMap<String, oneshot::Sender<actr_protocol::ActorResult<Bytes>>>>> {
89 self.pending_requests.clone()
90 }
91
92 fn actr_id_to_dest(actor_id: &ActrId) -> Dest {
94 Dest::actor(actor_id.clone())
95 }
96
97 fn serialize_envelope(envelope: &RpcEnvelope) -> Vec<u8> {
99 envelope.encode_to_vec()
100 }
101}
102
103impl OutprocOutGate {
104 pub async fn send_request(&self, target: &ActrId, envelope: RpcEnvelope) -> ActorResult<Bytes> {
106 tracing::debug!(
107 "📤 OutprocGate::send_request to {:?}, request_id={}",
108 target,
109 envelope.request_id
110 );
111
112 let (response_tx, response_rx) = oneshot::channel();
114
115 {
117 let mut pending = self.pending_requests.write().await;
118 pending.insert(envelope.request_id.clone(), response_tx);
119 }
120
121 let data = Self::serialize_envelope(&envelope);
123
124 let dest = Self::actr_id_to_dest(target);
126
127 match self
129 .transport_manager
130 .send(&dest, PayloadType::RpcReliable, &data)
131 .await
132 {
133 Ok(_) => {
134 tracing::debug!("✅ Sent request to {:?}", target);
135 }
136 Err(e) => {
137 self.pending_requests
139 .write()
140 .await
141 .remove(&envelope.request_id);
142 return Err(ProtocolError::TransportError(e.to_string()));
143 }
144 }
145
146 let timeout = std::time::Duration::from_millis(envelope.timeout_ms as u64);
148
149 match tokio::time::timeout(timeout, response_rx).await {
150 Ok(Ok(result)) => {
151 tracing::debug!("✅ Received response for request: {}", envelope.request_id);
153 result
154 }
155 Ok(Err(_)) => {
156 Err(ProtocolError::TransportError(
158 "Response channel closed".to_string(),
159 ))
160 }
161 Err(_) => {
162 self.pending_requests
164 .write()
165 .await
166 .remove(&envelope.request_id);
167 Err(ProtocolError::TransportError(format!(
168 "Request timeout: {}ms",
169 envelope.timeout_ms
170 )))
171 }
172 }
173 }
174
175 pub async fn send_message(&self, target: &ActrId, envelope: RpcEnvelope) -> ActorResult<()> {
177 tracing::debug!("📤 OutprocGate::send_message to {:?}", target);
178
179 let data = Self::serialize_envelope(&envelope);
181
182 let dest = Self::actr_id_to_dest(target);
184
185 self.transport_manager
187 .send(&dest, PayloadType::RpcReliable, &data)
188 .await
189 .map_err(|e| ProtocolError::TransportError(e.to_string()))?;
190
191 tracing::debug!("✅ Sent message to {:?}", target);
192 Ok(())
193 }
194
195 pub async fn send_media_sample(
205 &self,
206 target: &ActrId,
207 track_id: &str,
208 sample: MediaSample,
209 ) -> ActorResult<()> {
210 tracing::debug!(
211 "📤 OutprocGate::send_media_sample to {:?}, track_id={}",
212 target,
213 track_id
214 );
215
216 let coordinator = self.webrtc_coordinator.as_ref().ok_or_else(|| {
218 ProtocolError::Actr(actr_protocol::ActrError::NotImplemented {
219 feature: "MediaTrack requires WebRTC coordinator".to_string(),
220 })
221 })?;
222
223 coordinator
225 .send_media_sample(target, track_id, sample)
226 .await
227 .map_err(|e| ProtocolError::TransportError(format!("WebRTC send failed: {e}")))?;
228
229 tracing::debug!("✅ Sent media sample to {:?}", target);
230 Ok(())
231 }
232
233 pub async fn send_data_stream(
243 &self,
244 target: &ActrId,
245 payload_type: PayloadType,
246 data: Bytes,
247 ) -> ActorResult<()> {
248 tracing::debug!(
249 "📤 OutprocGate::send_data_stream to {:?}, payload_type={:?}, size={} bytes",
250 target,
251 payload_type,
252 data.len()
253 );
254
255 let dest = Self::actr_id_to_dest(target);
257
258 self.transport_manager
260 .send(&dest, payload_type, &data)
261 .await
262 .map_err(|e| ProtocolError::TransportError(e.to_string()))?;
263
264 tracing::debug!("✅ Sent DataStream to {:?}", target);
265 Ok(())
266 }
267}
268
269impl Drop for OutprocOutGate {
270 fn drop(&mut self) {
271 tracing::debug!("🗑️ OutprocGate dropped");
272 }
273}