1use crate::inbound::{DataStreamRegistry, MediaFrameRegistry};
6use crate::outbound::OutGate;
7use crate::wire::webrtc::SignalingClient;
8use actr_framework::{Bytes, Context, DataStream, Dest, MediaSample};
9use actr_protocol::{
10 AIdCredential, ActorResult, ActrError, ActrId, ActrType, ProtocolError, RouteCandidatesRequest,
11 RpcEnvelope, RpcRequest, route_candidates_request,
12};
13use async_trait::async_trait;
14use futures_util::future::BoxFuture;
15use std::sync::Arc;
16
17#[derive(Clone)]
32pub struct RuntimeContext {
33 self_id: ActrId,
34 caller_id: Option<ActrId>,
35 request_id: String,
36 inproc_gate: OutGate, outproc_gate: Option<OutGate>, data_stream_registry: Arc<DataStreamRegistry>, media_frame_registry: Arc<MediaFrameRegistry>, signaling_client: Arc<dyn SignalingClient>,
41 credential: AIdCredential,
42}
43
44impl RuntimeContext {
45 #[allow(clippy::too_many_arguments)] pub fn new(
60 self_id: ActrId,
61 caller_id: Option<ActrId>,
62 request_id: String,
63 inproc_gate: OutGate,
64 outproc_gate: Option<OutGate>,
65 data_stream_registry: Arc<DataStreamRegistry>,
66 media_frame_registry: Arc<MediaFrameRegistry>,
67 signaling_client: Arc<dyn SignalingClient>,
68 credential: AIdCredential,
69 ) -> Self {
70 Self {
71 self_id,
72 caller_id,
73 request_id,
74 inproc_gate,
75 outproc_gate,
76 data_stream_registry,
77 media_frame_registry,
78 signaling_client,
79 credential,
80 }
81 }
82
83 #[inline]
89 fn select_gate(&self, dest: &Dest) -> ActorResult<&OutGate> {
90 match dest {
91 Dest::Shell | Dest::Local => Ok(&self.inproc_gate),
92 Dest::Actor(_) => self.outproc_gate.as_ref().ok_or_else(|| {
93 ProtocolError::Actr(ActrError::GateNotInitialized {
94 message: "OutprocOutGate not initialized yet (WebRTC setup in progress)"
95 .to_string(),
96 })
97 }),
98 }
99 }
100
101 #[inline]
107 fn extract_target_id<'a>(&'a self, dest: &'a Dest) -> &'a ActrId {
108 match dest {
109 Dest::Shell | Dest::Local => &self.self_id,
110 Dest::Actor(id) => id,
111 }
112 }
113}
114
115#[async_trait]
116impl Context for RuntimeContext {
117 fn self_id(&self) -> &ActrId {
120 &self.self_id
121 }
122
123 fn caller_id(&self) -> Option<&ActrId> {
124 self.caller_id.as_ref()
125 }
126
127 fn request_id(&self) -> &str {
128 &self.request_id
129 }
130
131 async fn call<R: RpcRequest>(&self, target: &Dest, request: R) -> ActorResult<R::Response> {
134 use actr_protocol::prost::Message as ProstMessage;
135
136 let payload: Bytes = request.encode_to_vec().into();
138
139 let route_key = R::route_key().to_string();
141
142 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
144 let mut envelope = RpcEnvelope {
145 route_key,
146 payload: Some(payload),
147 error: None,
148 traceparent: None,
149 tracestate: None,
150 request_id: uuid::Uuid::new_v4().to_string(), metadata: vec![],
152 timeout_ms: 30000, };
154 #[cfg(feature = "opentelemetry")]
156 {
157 use crate::wire::webrtc::trace::inject_span_context_to_rpc;
158 inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
159 }
160
161 let gate = self.select_gate(target)?;
163 let target_id = self.extract_target_id(target);
164
165 let response_bytes = gate.send_request(target_id, envelope).await?;
167
168 R::Response::decode(&*response_bytes).map_err(|e| {
170 ProtocolError::Actr(ActrError::DecodeFailure {
171 message: format!(
172 "Failed to decode {}: {}",
173 std::any::type_name::<R::Response>(),
174 e
175 ),
176 })
177 })
178 }
179
180 async fn tell<R: RpcRequest>(&self, target: &Dest, message: R) -> ActorResult<()> {
181 let payload: Bytes = message.encode_to_vec().into();
183
184 let route_key = R::route_key().to_string();
186
187 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
189 let mut envelope = RpcEnvelope {
190 route_key,
191 payload: Some(payload),
192 error: None,
193 traceparent: None,
194 tracestate: None,
195 request_id: uuid::Uuid::new_v4().to_string(),
196 metadata: vec![],
197 timeout_ms: 0, };
199 #[cfg(feature = "opentelemetry")]
201 {
202 use crate::wire::webrtc::trace::inject_span_context_to_rpc;
203 inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
204 }
205
206 let gate = self.select_gate(target)?;
208 let target_id = self.extract_target_id(target);
209
210 gate.send_message(target_id, envelope).await
212 }
213
214 async fn register_stream<F>(&self, stream_id: String, callback: F) -> ActorResult<()>
217 where
218 F: Fn(DataStream, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static,
219 {
220 tracing::debug!(
221 "📊 Registering DataStream callback for stream_id: {}",
222 stream_id
223 );
224 self.data_stream_registry
225 .register(stream_id, Arc::new(callback));
226 Ok(())
227 }
228
229 async fn unregister_stream(&self, stream_id: &str) -> ActorResult<()> {
230 tracing::debug!(
231 "🚫 Unregistering DataStream callback for stream_id: {}",
232 stream_id
233 );
234 self.data_stream_registry.unregister(stream_id);
235 Ok(())
236 }
237
238 async fn send_data_stream(&self, target: &Dest, chunk: DataStream) -> ActorResult<()> {
239 use actr_protocol::prost::Message as ProstMessage;
240
241 let payload = chunk.encode_to_vec();
243
244 tracing::debug!(
245 "📤 Sending DataStream: stream_id={}, sequence={}, size={} bytes",
246 chunk.stream_id,
247 chunk.sequence,
248 payload.len()
249 );
250
251 let gate = self.select_gate(target)?;
253 let target_id = self.extract_target_id(target);
254
255 gate.send_data_stream(
259 target_id,
260 actr_protocol::PayloadType::StreamReliable,
261 bytes::Bytes::from(payload),
262 )
263 .await
264 }
265
266 async fn discover_route_candidate(&self, target_type: &ActrType) -> ActorResult<ActrId> {
267 if !self.signaling_client.is_connected() {
268 return Err(ProtocolError::TransportError(
269 "Signaling client is not connected.".to_string(),
270 ));
271 }
272
273 let criteria = route_candidates_request::NodeSelectionCriteria {
274 candidate_count: 1,
275 ranking_factors: Vec::new(),
276 minimal_dependency_requirement: None,
277 minimal_health_requirement: None,
278 };
279
280 let request = RouteCandidatesRequest {
281 target_type: target_type.clone(),
282 criteria: Some(criteria),
283 client_location: None,
284 };
285
286 let response = self
287 .signaling_client
288 .send_route_candidates_request(self.self_id.clone(), self.credential.clone(), request)
289 .await
290 .map_err(|e| {
291 ProtocolError::TransportError(format!("Route candidates request failed: {e}"))
292 })?;
293
294 match response.result {
295 Some(actr_protocol::route_candidates_response::Result::Success(ok)) => {
296 ok.candidates.into_iter().next().ok_or_else(|| {
297 ProtocolError::TargetNotFound(format!(
298 "No route candidates for type {}.{}",
299 target_type.manufacturer, target_type.name
300 ))
301 })
302 }
303 Some(actr_protocol::route_candidates_response::Result::Error(err)) => {
304 Err(ProtocolError::TransportError(format!(
305 "Route candidates error {}: {}",
306 err.code, err.message
307 )))
308 }
309 None => Err(ProtocolError::TransportError(
310 "Route candidates response missing result".to_string(),
311 )),
312 }
313 }
314
315 async fn register_media_track<F>(&self, track_id: String, callback: F) -> ActorResult<()>
318 where
319 F: Fn(MediaSample, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static,
320 {
321 tracing::debug!(
322 "📹 Registering MediaTrack callback for track_id: {}",
323 track_id
324 );
325 self.media_frame_registry
326 .register(track_id, Arc::new(callback));
327 Ok(())
328 }
329
330 async fn unregister_media_track(&self, track_id: &str) -> ActorResult<()> {
331 tracing::debug!(
332 "📹 Unregistering MediaTrack callback for track_id: {}",
333 track_id
334 );
335 self.media_frame_registry.unregister(track_id);
336 Ok(())
337 }
338
339 async fn send_media_sample(
340 &self,
341 target: &Dest,
342 track_id: &str,
343 sample: MediaSample,
344 ) -> ActorResult<()> {
345 let gate = self.select_gate(target)?;
347
348 let target_id = self.extract_target_id(target);
350
351 gate.send_media_sample(target_id, track_id, sample).await
353 }
354}