1use crate::inbound::{DataStreamRegistry, MediaFrameRegistry};
6use crate::outbound::OutGate;
7use crate::wire::webrtc::SignalingClient;
8#[cfg(feature = "opentelemetry")]
9use crate::wire::webrtc::trace::inject_span_context_to_rpc;
10use actr_framework::{Bytes, Context, DataStream, Dest, MediaSample};
11use actr_protocol::{
12 AIdCredential, ActorResult, ActrError, ActrId, ActrType, PayloadType, ProtocolError,
13 RouteCandidatesRequest, RpcEnvelope, RpcRequest, route_candidates_request,
14};
15use async_trait::async_trait;
16use futures_util::future::BoxFuture;
17use std::sync::Arc;
18
19#[derive(Clone)]
34pub struct RuntimeContext {
35 self_id: ActrId,
36 caller_id: Option<ActrId>,
37 request_id: String,
38 inproc_gate: OutGate, outproc_gate: Option<OutGate>, data_stream_registry: Arc<DataStreamRegistry>, media_frame_registry: Arc<MediaFrameRegistry>, signaling_client: Arc<dyn SignalingClient>,
43 credential: AIdCredential,
44}
45
46impl RuntimeContext {
47 #[allow(clippy::too_many_arguments)] pub fn new(
62 self_id: ActrId,
63 caller_id: Option<ActrId>,
64 request_id: String,
65 inproc_gate: OutGate,
66 outproc_gate: Option<OutGate>,
67 data_stream_registry: Arc<DataStreamRegistry>,
68 media_frame_registry: Arc<MediaFrameRegistry>,
69 signaling_client: Arc<dyn SignalingClient>,
70 credential: AIdCredential,
71 ) -> Self {
72 Self {
73 self_id,
74 caller_id,
75 request_id,
76 inproc_gate,
77 outproc_gate,
78 data_stream_registry,
79 media_frame_registry,
80 signaling_client,
81 credential,
82 }
83 }
84
85 #[inline]
91 fn select_gate(&self, dest: &Dest) -> ActorResult<&OutGate> {
92 match dest {
93 Dest::Shell | Dest::Local => Ok(&self.inproc_gate),
94 Dest::Actor(_) => self.outproc_gate.as_ref().ok_or_else(|| {
95 ProtocolError::Actr(ActrError::GateNotInitialized {
96 message: "OutprocOutGate not initialized yet (WebRTC setup in progress)"
97 .to_string(),
98 })
99 }),
100 }
101 }
102
103 #[inline]
109 fn extract_target_id<'a>(&'a self, dest: &'a Dest) -> &'a ActrId {
110 match dest {
111 Dest::Shell | Dest::Local => &self.self_id,
112 Dest::Actor(id) => id,
113 }
114 }
115
116 #[cfg_attr(
118 feature = "opentelemetry",
119 tracing::instrument(skip_all, name = "RuntimeContext.call_raw")
120 )]
121 pub async fn call_raw(
122 &self,
123 target: &Dest,
124 route_key: String,
125 payload_type: PayloadType,
126 payload: Bytes,
127 timeout_ms: i64,
128 ) -> ActorResult<Bytes> {
129 #[cfg(feature = "opentelemetry")]
130 use crate::wire::webrtc::trace::inject_span_context_to_rpc;
131
132 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
133 let mut envelope = RpcEnvelope {
134 route_key,
135 payload: Some(payload),
136 error: None,
137 traceparent: None,
138 tracestate: None,
139 request_id: uuid::Uuid::new_v4().to_string(),
140 metadata: vec![],
141 timeout_ms,
142 };
143 #[cfg(feature = "opentelemetry")]
144 inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
145
146 let gate = self.select_gate(target)?;
147 let target_id = self.extract_target_id(target);
148 gate.send_request_with_type(target_id, payload_type, envelope)
149 .await
150 }
151
152 #[cfg_attr(
154 feature = "opentelemetry",
155 tracing::instrument(skip_all, name = "RuntimeContext.tell_raw")
156 )]
157 pub async fn tell_raw(
158 &self,
159 target: &Dest,
160 route_key: String,
161 payload_type: PayloadType,
162 payload: Bytes,
163 ) -> ActorResult<()> {
164 #[cfg(feature = "opentelemetry")]
165 use crate::wire::webrtc::trace::inject_span_context_to_rpc;
166
167 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
168 let mut envelope = RpcEnvelope {
169 route_key,
170 payload: Some(payload),
171 error: None,
172 traceparent: None,
173 tracestate: None,
174 request_id: uuid::Uuid::new_v4().to_string(),
175 metadata: vec![],
176 timeout_ms: 0,
177 };
178 #[cfg(feature = "opentelemetry")]
179 inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
180
181 let gate = self.select_gate(target)?;
182 let target_id = self.extract_target_id(target);
183 gate.send_message_with_type(target_id, payload_type, envelope)
184 .await
185 }
186
187 pub async fn send_data_stream_with_type(
192 &self,
193 target: &Dest,
194 payload_type: actr_protocol::PayloadType,
195 chunk: DataStream,
196 ) -> ActorResult<()> {
197 use actr_protocol::prost::Message as ProstMessage;
198
199 let payload = chunk.encode_to_vec();
200
201 let gate = self.select_gate(target)?;
202 let target_id = self.extract_target_id(target);
203
204 let result = gate
205 .send_data_stream(target_id, payload_type, bytes::Bytes::from(payload).into())
206 .await;
207
208 result
209 }
210}
211
212#[async_trait]
213impl Context for RuntimeContext {
214 fn self_id(&self) -> &ActrId {
217 &self.self_id
218 }
219
220 fn caller_id(&self) -> Option<&ActrId> {
221 self.caller_id.as_ref()
222 }
223
224 fn request_id(&self) -> &str {
225 &self.request_id
226 }
227
228 #[cfg_attr(
230 feature = "opentelemetry",
231 tracing::instrument(skip_all, name = "RuntimeContext.call")
232 )]
233 async fn call<R: RpcRequest>(&self, target: &Dest, request: R) -> ActorResult<R::Response> {
234 use actr_protocol::prost::Message as ProstMessage;
235
236 let payload: Bytes = request.encode_to_vec().into();
238
239 let route_key = R::route_key().to_string();
241
242 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
244 let mut envelope = RpcEnvelope {
245 route_key,
246 payload: Some(payload),
247 error: None,
248 traceparent: None,
249 tracestate: None,
250 request_id: uuid::Uuid::new_v4().to_string(), metadata: vec![],
252 timeout_ms: 30000, };
254 #[cfg(feature = "opentelemetry")]
256 inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
257
258 let gate = self.select_gate(target)?;
260 let target_id = self.extract_target_id(target);
261
262 let response_bytes = gate
265 .send_request_with_type(target_id, R::payload_type(), envelope)
266 .await?;
267
268 R::Response::decode(&*response_bytes).map_err(|e| {
270 ProtocolError::Actr(ActrError::DecodeFailure {
271 message: format!(
272 "Failed to decode {}: {}",
273 std::any::type_name::<R::Response>(),
274 e
275 ),
276 })
277 })
278 }
279
280 #[cfg_attr(
281 feature = "opentelemetry",
282 tracing::instrument(skip_all, name = "RuntimeContext.tell")
283 )]
284 async fn tell<R: RpcRequest>(&self, target: &Dest, message: R) -> ActorResult<()> {
285 let payload: Bytes = message.encode_to_vec().into();
287
288 let route_key = R::route_key().to_string();
290
291 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
293 let mut envelope = RpcEnvelope {
294 route_key,
295 payload: Some(payload),
296 error: None,
297 traceparent: None,
298 tracestate: None,
299 request_id: uuid::Uuid::new_v4().to_string(),
300 metadata: vec![],
301 timeout_ms: 0, };
303 #[cfg(feature = "opentelemetry")]
305 inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
306
307 let gate = self.select_gate(target)?;
309 let target_id = self.extract_target_id(target);
310
311 gate.send_message_with_type(target_id, R::payload_type(), envelope)
313 .await
314 }
315
316 async fn register_stream<F>(&self, stream_id: String, callback: F) -> ActorResult<()>
319 where
320 F: Fn(DataStream, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static,
321 {
322 tracing::debug!(
323 "📊 Registering DataStream callback for stream_id: {}",
324 stream_id
325 );
326 self.data_stream_registry
327 .register(stream_id, Arc::new(callback));
328 Ok(())
329 }
330
331 async fn unregister_stream(&self, stream_id: &str) -> ActorResult<()> {
332 tracing::debug!(
333 "🚫 Unregistering DataStream callback for stream_id: {}",
334 stream_id
335 );
336 self.data_stream_registry.unregister(stream_id);
337 Ok(())
338 }
339
340 async fn send_data_stream(&self, target: &Dest, chunk: DataStream) -> ActorResult<()> {
341 use actr_protocol::prost::Message as ProstMessage;
342
343 let payload = chunk.encode_to_vec();
345
346 tracing::debug!(
347 "📤 Sending DataStream: stream_id={}, sequence={}, size={} bytes",
348 chunk.stream_id,
349 chunk.sequence,
350 payload.len()
351 );
352
353 let gate = self.select_gate(target)?;
355 let target_id = self.extract_target_id(target);
356
357 gate.send_data_stream(
361 target_id,
362 actr_protocol::PayloadType::StreamReliable,
363 bytes::Bytes::from(payload),
364 )
365 .await
366 }
367
368 async fn discover_route_candidate(&self, target_type: &ActrType) -> ActorResult<ActrId> {
369 if !self.signaling_client.is_connected() {
370 return Err(ProtocolError::TransportError(
371 "Signaling client is not connected.".to_string(),
372 ));
373 }
374
375 let criteria = route_candidates_request::NodeSelectionCriteria {
376 candidate_count: 1,
377 ranking_factors: Vec::new(),
378 minimal_dependency_requirement: None,
379 minimal_health_requirement: None,
380 };
381
382 let request = RouteCandidatesRequest {
383 target_type: target_type.clone(),
384 criteria: Some(criteria),
385 client_location: None,
386 };
387
388 let response = self
389 .signaling_client
390 .send_route_candidates_request(self.self_id.clone(), self.credential.clone(), request)
391 .await
392 .map_err(|e| {
393 ProtocolError::TransportError(format!("Route candidates request failed: {e}"))
394 })?;
395
396 match response.result {
397 Some(actr_protocol::route_candidates_response::Result::Success(ok)) => {
398 ok.candidates.into_iter().next().ok_or_else(|| {
399 ProtocolError::TargetNotFound(format!(
400 "No route candidates for type {}.{}",
401 target_type.manufacturer, target_type.name
402 ))
403 })
404 }
405 Some(actr_protocol::route_candidates_response::Result::Error(err)) => {
406 Err(ProtocolError::TransportError(format!(
407 "Route candidates error {}: {}",
408 err.code, err.message
409 )))
410 }
411 None => Err(ProtocolError::TransportError(
412 "Route candidates response missing result".to_string(),
413 )),
414 }
415 }
416
417 #[cfg_attr(
418 feature = "opentelemetry",
419 tracing::instrument(skip_all, name = "RuntimeContext.call_raw")
420 )]
421 async fn call_raw(
422 &self,
423 target: &ActrId,
424 route_key: &str,
425 payload: Bytes,
426 ) -> ActorResult<Bytes> {
427 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
429 let mut envelope = RpcEnvelope {
430 route_key: route_key.to_string(),
431 payload: Some(payload),
432 error: None,
433 traceparent: None,
434 tracestate: None,
435 request_id: uuid::Uuid::new_v4().to_string(),
436 metadata: vec![],
437 timeout_ms: 30000, };
439
440 #[cfg(feature = "opentelemetry")]
442 inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
443
444 let gate = self.outproc_gate.as_ref().ok_or_else(|| {
446 ProtocolError::Actr(ActrError::GateNotInitialized {
447 message: "OutprocOutGate not initialized yet (WebRTC setup in progress)"
448 .to_string(),
449 })
450 })?;
451
452 gate.send_request(target, envelope).await
454 }
455
456 async fn register_media_track<F>(&self, track_id: String, callback: F) -> ActorResult<()>
459 where
460 F: Fn(MediaSample, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static,
461 {
462 tracing::debug!(
463 "📹 Registering MediaTrack callback for track_id: {}",
464 track_id
465 );
466 self.media_frame_registry
467 .register(track_id, Arc::new(callback));
468 Ok(())
469 }
470
471 async fn unregister_media_track(&self, track_id: &str) -> ActorResult<()> {
472 tracing::debug!(
473 "📹 Unregistering MediaTrack callback for track_id: {}",
474 track_id
475 );
476 self.media_frame_registry.unregister(track_id);
477 Ok(())
478 }
479
480 async fn send_media_sample(
481 &self,
482 target: &Dest,
483 track_id: &str,
484 sample: MediaSample,
485 ) -> ActorResult<()> {
486 let gate = self.select_gate(target)?;
488
489 let target_id = self.extract_target_id(target);
491
492 gate.send_media_sample(target_id, track_id, sample).await
494 }
495}