use crate::inbound::{DataStreamRegistry, MediaFrameRegistry};
use crate::outbound::Gate;
use crate::wire::webrtc::SignalingClient;
#[cfg(feature = "opentelemetry")]
use crate::wire::webrtc::trace::inject_span_context_to_rpc;
use actr_config::lock::LockFile;
use actr_framework::{Bytes, Context, DataStream, Dest, MediaSample};
use actr_protocol::{
AIdCredential, ActorResult, ActrError, ActrId, ActrType, PayloadType, RouteCandidatesRequest,
RpcEnvelope, RpcRequest, route_candidates_request,
};
use async_trait::async_trait;
use futures_util::future::BoxFuture;
use std::sync::Arc;
#[derive(Clone)]
pub struct RuntimeContext {
self_id: ActrId,
caller_id: Option<ActrId>,
request_id: String,
inproc_gate: Gate, outproc_gate: Option<Gate>, data_stream_registry: Arc<DataStreamRegistry>, media_frame_registry: Arc<MediaFrameRegistry>, signaling_client: Arc<dyn SignalingClient>,
credential: AIdCredential,
actr_lock: Option<Arc<LockFile>>, }
impl RuntimeContext {
#[allow(clippy::too_many_arguments)] pub(crate) fn new(
self_id: ActrId,
caller_id: Option<ActrId>,
request_id: String,
inproc_gate: Gate,
outproc_gate: Option<Gate>,
data_stream_registry: Arc<DataStreamRegistry>,
media_frame_registry: Arc<MediaFrameRegistry>,
signaling_client: Arc<dyn SignalingClient>,
credential: AIdCredential,
actr_lock: Option<Arc<LockFile>>,
) -> Self {
Self {
self_id,
caller_id,
request_id,
inproc_gate,
outproc_gate,
data_stream_registry,
media_frame_registry,
signaling_client,
credential,
actr_lock,
}
}
#[inline]
fn select_gate(&self, dest: &Dest) -> ActorResult<&Gate> {
match dest {
Dest::Shell | Dest::Local => Ok(&self.inproc_gate),
Dest::Actor(_) => self.outproc_gate.as_ref().ok_or_else(|| {
ActrError::Internal(
"PeerGate not initialized yet (WebRTC setup in progress)".to_string(),
)
}),
}
}
#[inline]
fn extract_target_id<'a>(&'a self, dest: &'a Dest) -> &'a ActrId {
match dest {
Dest::Shell | Dest::Local => &self.self_id,
Dest::Actor(id) => id,
}
}
#[cfg_attr(
feature = "opentelemetry",
tracing::instrument(
skip_all,
name = "RuntimeContext.call_raw",
fields(
actr_id = %self.self_id,
route_key = %route_key,
)
)
)]
pub async fn call_raw(
&self,
target: &Dest,
route_key: String,
payload_type: PayloadType,
payload: Bytes,
timeout_ms: i64,
) -> ActorResult<Bytes> {
#[cfg(feature = "opentelemetry")]
use crate::wire::webrtc::trace::inject_span_context_to_rpc;
#[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
let mut envelope = RpcEnvelope {
route_key,
payload: Some(payload),
error: None,
traceparent: None,
tracestate: None,
request_id: uuid::Uuid::new_v4().to_string(),
metadata: vec![],
timeout_ms,
};
#[cfg(feature = "opentelemetry")]
inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
let gate = self.select_gate(target)?;
let target_id = self.extract_target_id(target);
gate.send_request_with_type(target_id, payload_type, envelope)
.await
}
#[cfg_attr(
feature = "opentelemetry",
tracing::instrument(
skip_all,
name = "RuntimeContext.tell_raw",
fields(
actr_id = %self.self_id,
route_key = %route_key,
)
)
)]
pub async fn tell_raw(
&self,
target: &Dest,
route_key: String,
payload_type: PayloadType,
payload: Bytes,
) -> ActorResult<()> {
#[cfg(feature = "opentelemetry")]
use crate::wire::webrtc::trace::inject_span_context_to_rpc;
#[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
let mut envelope = RpcEnvelope {
route_key,
payload: Some(payload),
error: None,
traceparent: None,
tracestate: None,
request_id: uuid::Uuid::new_v4().to_string(),
metadata: vec![],
timeout_ms: 0,
};
#[cfg(feature = "opentelemetry")]
inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
let gate = self.select_gate(target)?;
let target_id = self.extract_target_id(target);
gate.send_message_with_type(target_id, payload_type, envelope)
.await
}
pub async fn send_data_stream_with_type(
&self,
target: &Dest,
payload_type: actr_protocol::PayloadType,
chunk: DataStream,
) -> ActorResult<()> {
use actr_protocol::prost::Message as ProstMessage;
let payload = chunk.encode_to_vec();
let stream_id = chunk.stream_id.as_str();
let gate = self.select_gate(target)?;
let target_id = self.extract_target_id(target);
gate.send_data_stream(
target_id,
payload_type,
stream_id,
bytes::Bytes::from(payload),
)
.await
}
fn get_dependency_fingerprint(&self, target_type: &ActrType) -> Option<String> {
let actr_lock = self.actr_lock.as_ref()?;
let key = target_type.to_string_repr();
if let Some(dep) = actr_lock.get_dependency(&key) {
return Some(dep.fingerprint.clone());
}
for dep in &actr_lock.dependencies {
if Self::matches_dependency_actr_type(&dep.actr_type, target_type) {
return Some(dep.fingerprint.clone());
}
}
None
}
fn matches_dependency_actr_type(raw: &str, target_type: &ActrType) -> bool {
let Ok(dep_type) = ActrType::from_string_repr(raw) else {
return false;
};
dep_type == *target_type
}
async fn send_discovery_request(
&self,
target_type: &ActrType,
candidate_count: u32,
client_fingerprint: String,
) -> ActorResult<InternalDiscoveryResult> {
let criteria = route_candidates_request::NodeSelectionCriteria {
candidate_count,
ranking_factors: Vec::new(),
minimal_dependency_requirement: None,
minimal_health_requirement: None,
};
let request = RouteCandidatesRequest {
target_type: target_type.clone(),
criteria: Some(criteria),
client_location: None,
client_fingerprint,
};
let response = self
.signaling_client
.send_route_candidates_request(self.self_id.clone(), self.credential.clone(), request)
.await
.map_err(|e| ActrError::Unavailable(format!("Route candidates request failed: {e}")))?;
match response.result {
Some(actr_protocol::route_candidates_response::Result::Success(success)) => {
Ok(InternalDiscoveryResult {
candidates: success.candidates,
})
}
Some(actr_protocol::route_candidates_response::Result::Error(err)) => {
Err(ActrError::Unavailable(format!(
"Route candidates error {}: {}",
err.code, err.message
)))
}
None => Err(ActrError::Unavailable(
"Invalid route candidates response: missing result".to_string(),
)),
}
}
}
struct InternalDiscoveryResult {
candidates: Vec<ActrId>,
}
#[derive(Clone)]
pub(crate) struct BootstrapContextBuilder {
inproc_gate: Gate,
outproc_gate: Option<Gate>,
data_stream_registry: Arc<DataStreamRegistry>,
media_frame_registry: Arc<MediaFrameRegistry>,
signaling_client: Arc<dyn SignalingClient>,
actr_lock: Option<Arc<LockFile>>,
}
impl BootstrapContextBuilder {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
inproc_gate: Gate,
outproc_gate: Option<Gate>,
data_stream_registry: Arc<DataStreamRegistry>,
media_frame_registry: Arc<MediaFrameRegistry>,
signaling_client: Arc<dyn SignalingClient>,
actr_lock: Option<Arc<LockFile>>,
) -> Self {
Self {
inproc_gate,
outproc_gate,
data_stream_registry,
media_frame_registry,
signaling_client,
actr_lock,
}
}
pub(crate) fn build_bootstrap(
&self,
self_id: &ActrId,
credential: &AIdCredential,
) -> RuntimeContext {
RuntimeContext::new(
self_id.clone(),
None,
uuid::Uuid::new_v4().to_string(),
self.inproc_gate.clone(),
self.outproc_gate.clone(),
self.data_stream_registry.clone(),
self.media_frame_registry.clone(),
self.signaling_client.clone(),
credential.clone(),
self.actr_lock.clone(),
)
}
}
#[async_trait]
impl Context for RuntimeContext {
fn self_id(&self) -> &ActrId {
&self.self_id
}
fn caller_id(&self) -> Option<&ActrId> {
self.caller_id.as_ref()
}
fn request_id(&self) -> &str {
&self.request_id
}
#[cfg_attr(
feature = "opentelemetry",
tracing::instrument(
skip_all,
name = "RuntimeContext.call",
fields(actr_id = %self.self_id)
)
)]
async fn call<R: RpcRequest>(&self, target: &Dest, request: R) -> ActorResult<R::Response> {
use actr_protocol::prost::Message as ProstMessage;
let payload: Bytes = request.encode_to_vec().into();
let route_key = R::route_key().to_string();
#[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
let mut envelope = RpcEnvelope {
route_key,
payload: Some(payload),
error: None,
traceparent: None,
tracestate: None,
request_id: uuid::Uuid::new_v4().to_string(), metadata: vec![],
timeout_ms: 30000, };
#[cfg(feature = "opentelemetry")]
inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
let gate = self.select_gate(target)?;
let target_id = self.extract_target_id(target);
let response_bytes = gate
.send_request_with_type(target_id, R::payload_type(), envelope)
.await?;
R::Response::decode(&*response_bytes).map_err(|e| {
ActrError::DecodeFailure(format!(
"Failed to decode {}: {}",
std::any::type_name::<R::Response>(),
e
))
})
}
#[cfg_attr(
feature = "opentelemetry",
tracing::instrument(
skip_all,
name = "RuntimeContext.tell",
fields(actr_id = %self.self_id)
)
)]
async fn tell<R: RpcRequest>(&self, target: &Dest, message: R) -> ActorResult<()> {
let payload: Bytes = message.encode_to_vec().into();
let route_key = R::route_key().to_string();
#[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
let mut envelope = RpcEnvelope {
route_key,
payload: Some(payload),
error: None,
traceparent: None,
tracestate: None,
request_id: uuid::Uuid::new_v4().to_string(),
metadata: vec![],
timeout_ms: 0, };
#[cfg(feature = "opentelemetry")]
inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
let gate = self.select_gate(target)?;
let target_id = self.extract_target_id(target);
gate.send_message_with_type(target_id, R::payload_type(), envelope)
.await
}
async fn register_stream<F>(&self, stream_id: String, callback: F) -> ActorResult<()>
where
F: Fn(DataStream, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static,
{
tracing::debug!(
"📊 Registering DataStream callback for stream_id: {}",
stream_id
);
self.data_stream_registry
.register(stream_id, Arc::new(callback));
Ok(())
}
async fn unregister_stream(&self, stream_id: &str) -> ActorResult<()> {
tracing::debug!(
"🚫 Unregistering DataStream callback for stream_id: {}",
stream_id
);
self.data_stream_registry.unregister(stream_id);
Ok(())
}
async fn send_data_stream(
&self,
target: &Dest,
chunk: DataStream,
payload_type: actr_protocol::PayloadType,
) -> ActorResult<()> {
use actr_protocol::prost::Message as ProstMessage;
let payload = chunk.encode_to_vec();
let stream_id = chunk.stream_id.as_str();
tracing::debug!(
"📤 Sending DataStream: stream_id={}, sequence={}, size={} bytes",
stream_id,
chunk.sequence,
payload.len()
);
let gate = self.select_gate(target)?;
let target_id = self.extract_target_id(target);
gate.send_data_stream(
target_id,
payload_type,
stream_id,
bytes::Bytes::from(payload),
)
.await
}
#[cfg_attr(
feature = "opentelemetry",
tracing::instrument(
skip_all,
name = "RuntimeContext.discover_route_candidate",
fields(
actr_id = %self.self_id,
target_type = %target_type,
)
)
)]
async fn discover_route_candidate(&self, target_type: &ActrType) -> ActorResult<ActrId> {
if !self.signaling_client.is_connected() {
return Err(ActrError::Unavailable(
"Signaling client is not connected.".to_string(),
));
}
let service_name = format!("{}:{}", target_type.manufacturer, target_type.name);
let client_fingerprint = match self.get_dependency_fingerprint(target_type) {
Some(fingerprint) => fingerprint,
None => {
if self.actr_lock.is_none() {
tracing::debug!(
"manifest.lock.toml not loaded; sending discovery without fingerprint for '{}'",
service_name
);
String::new()
} else {
tracing::error!(
severity = 10,
error_category = "dependency_missing",
"❌ DEPENDENCY NOT FOUND: Service '{}' is not declared in manifest.lock.toml.\n\
Please run 'actr deps install' to generate the lock file with all dependencies.",
service_name
);
return Err(ActrError::DependencyNotFound {
service_name: service_name.clone(),
message: format!(
"Dependency '{}' not found in manifest.lock.toml. Run 'actr deps install' to resolve dependencies.",
service_name
),
});
}
}
};
if !client_fingerprint.is_empty() {
tracing::debug!(
"📋 Found dependency fingerprint for '{}': {}",
service_name,
&client_fingerprint[..20.min(client_fingerprint.len())]
);
}
let result = self
.send_discovery_request(target_type, 1, client_fingerprint)
.await?;
tracing::info!(
"Discovery result [{}]: {} candidates",
service_name,
result.candidates.len(),
);
result.candidates.into_iter().next().ok_or_else(|| {
ActrError::NotFound(format!(
"No route candidates for type {}/{}",
target_type.manufacturer, target_type.name
))
})
}
async fn call_raw(
&self,
target: &ActrId,
route_key: &str,
payload: Bytes,
) -> ActorResult<Bytes> {
RuntimeContext::call_raw(
self,
&Dest::Actor(target.clone()),
route_key.to_string(),
PayloadType::RpcReliable,
payload,
30_000,
)
.await
}
async fn register_media_track<F>(&self, track_id: String, callback: F) -> ActorResult<()>
where
F: Fn(MediaSample, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static,
{
tracing::debug!(
"📹 Registering MediaTrack callback for track_id: {}",
track_id
);
self.media_frame_registry
.register(track_id, Arc::new(callback));
Ok(())
}
async fn unregister_media_track(&self, track_id: &str) -> ActorResult<()> {
tracing::debug!(
"📹 Unregistering MediaTrack callback for track_id: {}",
track_id
);
self.media_frame_registry.unregister(track_id);
Ok(())
}
async fn send_media_sample(
&self,
target: &Dest,
track_id: &str,
sample: MediaSample,
) -> ActorResult<()> {
let gate = self.select_gate(target)?;
let target_id = self.extract_target_id(target);
gate.send_media_sample(target_id, track_id, sample).await
}
async fn add_media_track(
&self,
target: &Dest,
track_id: &str,
codec: &str,
media_type: &str,
) -> ActorResult<()> {
let gate = self.select_gate(target)?;
let target_id = self.extract_target_id(target);
gate.add_media_track(target_id, track_id, codec, media_type)
.await
}
async fn remove_media_track(&self, target: &Dest, track_id: &str) -> ActorResult<()> {
let gate = self.select_gate(target)?;
let target_id = self.extract_target_id(target);
gate.remove_media_track(target_id, track_id).await
}
}