1use crate::inbound::{DataStreamRegistry, MediaFrameRegistry};
6use crate::outbound::OutGate;
7use actr_framework::{Bytes, Context, DataStream, Dest, MediaSample};
8use actr_protocol::{ActorResult, ActrError, ActrId, ProtocolError, RpcEnvelope, RpcRequest};
9use async_trait::async_trait;
10use futures_util::future::BoxFuture;
11use std::sync::Arc;
12
13#[derive(Clone)]
28pub struct RuntimeContext {
29 self_id: ActrId,
30 caller_id: Option<ActrId>,
31 trace_id: String,
32 request_id: String,
33 inproc_gate: OutGate, outproc_gate: Option<OutGate>, data_stream_registry: Arc<DataStreamRegistry>, media_frame_registry: Arc<MediaFrameRegistry>, }
38
39impl RuntimeContext {
40 #[allow(clippy::too_many_arguments)] pub fn new(
54 self_id: ActrId,
55 caller_id: Option<ActrId>,
56 trace_id: String,
57 request_id: String,
58 inproc_gate: OutGate,
59 outproc_gate: Option<OutGate>,
60 data_stream_registry: Arc<DataStreamRegistry>,
61 media_frame_registry: Arc<MediaFrameRegistry>,
62 ) -> Self {
63 Self {
64 self_id,
65 caller_id,
66 trace_id,
67 request_id,
68 inproc_gate,
69 outproc_gate,
70 data_stream_registry,
71 media_frame_registry,
72 }
73 }
74
75 #[inline]
81 fn select_gate(&self, dest: &Dest) -> ActorResult<&OutGate> {
82 match dest {
83 Dest::Shell | Dest::Local => Ok(&self.inproc_gate),
84 Dest::Actor(_) => self.outproc_gate.as_ref().ok_or_else(|| {
85 ProtocolError::Actr(ActrError::GateNotInitialized {
86 message: "OutprocOutGate not initialized yet (WebRTC setup in progress)"
87 .to_string(),
88 })
89 }),
90 }
91 }
92
93 #[inline]
99 fn extract_target_id<'a>(&'a self, dest: &'a Dest) -> &'a ActrId {
100 match dest {
101 Dest::Shell | Dest::Local => &self.self_id,
102 Dest::Actor(id) => id,
103 }
104 }
105}
106
107#[async_trait]
108impl Context for RuntimeContext {
109 fn self_id(&self) -> &ActrId {
112 &self.self_id
113 }
114
115 fn caller_id(&self) -> Option<&ActrId> {
116 self.caller_id.as_ref()
117 }
118
119 fn trace_id(&self) -> &str {
120 &self.trace_id
121 }
122
123 fn request_id(&self) -> &str {
124 &self.request_id
125 }
126
127 async fn call<R: RpcRequest>(&self, target: &Dest, request: R) -> ActorResult<R::Response> {
130 use actr_protocol::prost::Message as ProstMessage;
131
132 let payload: Bytes = request.encode_to_vec().into();
134
135 let route_key = R::route_key().to_string();
137
138 let envelope = RpcEnvelope {
140 route_key,
141 payload: Some(payload),
142 error: None,
143 trace_id: self.trace_id.clone(), request_id: uuid::Uuid::new_v4().to_string(), metadata: vec![],
146 timeout_ms: 30000, };
148
149 let gate = self.select_gate(target)?;
151 let target_id = self.extract_target_id(target);
152
153 let response_bytes = gate.send_request(target_id, envelope).await?;
155
156 R::Response::decode(&*response_bytes).map_err(|e| {
158 ProtocolError::Actr(ActrError::DecodeFailure {
159 message: format!(
160 "Failed to decode {}: {}",
161 std::any::type_name::<R::Response>(),
162 e
163 ),
164 })
165 })
166 }
167
168 async fn tell<R: RpcRequest>(&self, target: &Dest, message: R) -> ActorResult<()> {
169 let payload: Bytes = message.encode_to_vec().into();
171
172 let route_key = R::route_key().to_string();
174
175 let envelope = RpcEnvelope {
177 route_key,
178 payload: Some(payload),
179 error: None,
180 trace_id: self.trace_id.clone(),
181 request_id: uuid::Uuid::new_v4().to_string(),
182 metadata: vec![],
183 timeout_ms: 0, };
185
186 let gate = self.select_gate(target)?;
188 let target_id = self.extract_target_id(target);
189
190 gate.send_message(target_id, envelope).await
192 }
193
194 async fn register_stream<F>(&self, stream_id: String, callback: F) -> ActorResult<()>
197 where
198 F: Fn(DataStream, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static,
199 {
200 tracing::debug!(
201 "📊 Registering DataStream callback for stream_id: {}",
202 stream_id
203 );
204 self.data_stream_registry
205 .register(stream_id, Arc::new(callback));
206 Ok(())
207 }
208
209 async fn unregister_stream(&self, stream_id: &str) -> ActorResult<()> {
210 tracing::debug!(
211 "🚫 Unregistering DataStream callback for stream_id: {}",
212 stream_id
213 );
214 self.data_stream_registry.unregister(stream_id);
215 Ok(())
216 }
217
218 async fn send_data_stream(&self, target: &Dest, chunk: DataStream) -> ActorResult<()> {
219 use actr_protocol::prost::Message as ProstMessage;
220
221 let payload = chunk.encode_to_vec();
223
224 tracing::debug!(
225 "📤 Sending DataStream: stream_id={}, sequence={}, size={} bytes",
226 chunk.stream_id,
227 chunk.sequence,
228 payload.len()
229 );
230
231 let gate = self.select_gate(target)?;
233 let target_id = self.extract_target_id(target);
234
235 gate.send_data_stream(
239 target_id,
240 actr_protocol::PayloadType::StreamReliable,
241 bytes::Bytes::from(payload),
242 )
243 .await
244 }
245
246 async fn register_media_track<F>(&self, track_id: String, callback: F) -> ActorResult<()>
249 where
250 F: Fn(MediaSample, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static,
251 {
252 tracing::debug!(
253 "📹 Registering MediaTrack callback for track_id: {}",
254 track_id
255 );
256 self.media_frame_registry
257 .register(track_id, Arc::new(callback));
258 Ok(())
259 }
260
261 async fn unregister_media_track(&self, track_id: &str) -> ActorResult<()> {
262 tracing::debug!(
263 "📹 Unregistering MediaTrack callback for track_id: {}",
264 track_id
265 );
266 self.media_frame_registry.unregister(track_id);
267 Ok(())
268 }
269
270 async fn send_media_sample(
271 &self,
272 target: &Dest,
273 track_id: &str,
274 sample: MediaSample,
275 ) -> ActorResult<()> {
276 let gate = self.select_gate(target)?;
278
279 let target_id = self.extract_target_id(target);
281
282 gate.send_media_sample(target_id, track_id, sample).await
284 }
285}