use super::coordinator::WebRtcCoordinator;
use crate::inbound::DataStreamRegistry;
#[cfg(feature = "opentelemetry")]
use crate::wire::webrtc::trace::set_parent_from_rpc_envelope;
use actr_framework::Bytes;
use actr_protocol::prost::Message as ProstMessage;
use actr_protocol::{self, ActrId, DataStream, PayloadType, RpcEnvelope};
use actr_protocol::{ActorResult, ActrError};
use actr_runtime_mailbox::{Mailbox, MessagePriority};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{RwLock, oneshot};
type PendingRequestsMap =
Arc<RwLock<HashMap<String, (ActrId, oneshot::Sender<actr_protocol::ActorResult<Bytes>>)>>>;
#[cfg(feature = "opentelemetry")]
use tracing::Instrument as _;
pub(crate) struct WebRtcGate {
local_id: Arc<RwLock<Option<ActrId>>>,
coordinator: Arc<WebRtcCoordinator>,
pending_requests: PendingRequestsMap,
data_stream_registry: Arc<DataStreamRegistry>,
}
impl WebRtcGate {
pub fn new(
coordinator: Arc<WebRtcCoordinator>,
pending_requests: PendingRequestsMap,
data_stream_registry: Arc<DataStreamRegistry>,
) -> Self {
Self {
local_id: Arc::new(RwLock::new(None)),
coordinator,
pending_requests,
data_stream_registry,
}
}
pub async fn set_local_id(&self, actor_id: ActrId) {
*self.local_id.write().await = Some(actor_id);
}
async fn handle_envelope(
envelope: RpcEnvelope,
from_bytes: Vec<u8>,
data: Bytes,
payload_type: PayloadType,
pending_requests: PendingRequestsMap,
mailbox: Arc<dyn Mailbox>,
) {
let request_id = envelope.request_id.clone();
let mut pending = pending_requests.write().await;
if let Some((target, response_tx)) = pending.remove(&request_id) {
drop(pending); tracing::debug!(
"📬 Received RPC Response: request_id={}, target={}",
request_id,
target
);
let result = match (envelope.payload, envelope.error) {
(Some(payload), None) => Ok(payload),
(None, Some(error)) => Err(ActrError::Unavailable(format!(
"RPC error {}: {}",
error.code, error.message
))),
_ => Err(ActrError::DecodeFailure(
"Invalid RpcEnvelope: payload and error fields inconsistent".to_string(),
)),
};
let _ = response_tx.send(result);
} else {
drop(pending); tracing::debug!("📥 Received RPC Request: request_id={}", request_id);
let priority = match payload_type {
PayloadType::RpcSignal => MessagePriority::High,
PayloadType::RpcReliable => MessagePriority::Normal,
_ => MessagePriority::Normal,
};
tracing::info!(request_id = %request_id, "rpc.mailbox.enqueue");
match mailbox.enqueue(from_bytes, data.to_vec(), priority).await {
Ok(msg_id) => {
tracing::debug!(
"✅ RPC message enqueued to Mailbox: msg_id={}, priority={:?}",
msg_id,
priority
);
}
Err(e) => {
tracing::error!("❌ Mailbox enqueue failed: {:?}", e);
}
}
}
}
pub async fn start_receive_loop(&self, mailbox: Arc<dyn Mailbox>) -> ActorResult<()> {
let coordinator = self.coordinator.clone();
let pending_requests = self.pending_requests.clone();
let data_stream_registry = self.data_stream_registry.clone();
#[cfg(feature = "opentelemetry")]
let local_id = self.local_id.clone();
tokio::spawn(async move {
loop {
match coordinator.receive_message().await {
Ok(Some((from_bytes, data, payload_type))) => {
tracing::debug!(
"📨 WebRtcGate received message: {} bytes, PayloadType: {:?}",
data.len(),
payload_type
);
match payload_type {
PayloadType::RpcReliable | PayloadType::RpcSignal => {
match RpcEnvelope::decode(&data[..]) {
Ok(envelope) => {
#[cfg(feature = "opentelemetry")]
let current_local_id = local_id.read().await.clone();
#[cfg(feature = "opentelemetry")]
let span = {
let actr_id_str = current_local_id
.as_ref()
.map(|id| id.to_string())
.unwrap_or_default();
let span = tracing::info_span!("WebRtcGate.receive_rpc", actr_id = %actr_id_str);
set_parent_from_rpc_envelope(&span, &envelope);
span
};
let handle_envelope_fut = Self::handle_envelope(
envelope,
from_bytes,
data,
payload_type,
pending_requests.clone(),
mailbox.clone(),
);
#[cfg(feature = "opentelemetry")]
let handle_envelope_fut =
handle_envelope_fut.instrument(span);
handle_envelope_fut.await;
}
Err(e) => {
tracing::error!(
"❌ Failed to deserialize RpcEnvelope: {:?}",
e
);
}
}
}
PayloadType::StreamReliable | PayloadType::StreamLatencyFirst => {
match DataStream::decode(&data[..]) {
Ok(chunk) => {
tracing::debug!(
"📦 Received DataStream: stream_id={}, seq={}, {} bytes",
chunk.stream_id,
chunk.sequence,
chunk.payload.len()
);
match ActrId::decode(&from_bytes[..]) {
Ok(sender_id) => {
data_stream_registry
.dispatch(chunk, sender_id)
.await;
}
Err(e) => {
tracing::error!(
"❌ Failed to decode sender ActrId: {:?}",
e
);
}
}
}
Err(e) => {
tracing::error!(
"❌ Failed to deserialize DataStream: {:?}",
e
);
}
}
}
PayloadType::MediaRtp => {
tracing::warn!(
"⚠️ MediaRtp received in WebRtcGate (should use RTCTrackRemote)"
);
}
}
}
Ok(None) => {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
Err(e) => {
tracing::error!("❌ Message receive failed: {:?}", e);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
}
});
Ok(())
}
#[cfg_attr(feature = "opentelemetry", tracing::instrument(
skip_all,
name = "WebRtcGate.send_response",
fields(actr_id = tracing::field::Empty)
))]
pub async fn send_response(
&self,
target: &ActrId,
response_envelope: RpcEnvelope,
) -> ActorResult<()> {
#[cfg(feature = "opentelemetry")]
{
let local_id = self.local_id.read().await;
if let Some(ref id) = *local_id {
tracing::Span::current().record("actr_id", tracing::field::display(id));
}
}
let mut buf = Vec::new();
response_envelope
.encode(&mut buf)
.map_err(|e| ActrError::Internal(format!("Failed to encode response: {e}")))?;
self.coordinator.send_message(target, &buf).await?;
tracing::debug!(
"📤 Sent response: request_id={}, {} bytes",
response_envelope.request_id,
buf.len()
);
Ok(())
}
}