mod component;
use std::cell::{Cell, RefCell};
use futures::channel::mpsc;
use medea_client_api_proto as proto;
use proto::{ConnectionMode, TrackId};
#[doc(inline)]
pub use self::component::{Component, State};
use super::TransceiverSide as _;
use crate::{
media::{MediaDirection, RecvConstraints, TrackConstraints, track::remote},
peer::{
MediaConnections, MediaStateControllable as _, PeerEvent, TrackEvent,
media::media_exchange_state,
},
platform, utils,
};
#[derive(Debug)]
pub struct Receiver {
track_id: TrackId,
connection_mode: ConnectionMode,
caps: TrackConstraints,
sender_id: proto::MemberId,
transceiver: RefCell<Option<platform::Transceiver>>,
mid: RefCell<Option<String>>,
track: RefCell<Option<remote::Track>>,
is_track_notified: Cell<bool>,
enabled_general: Cell<bool>,
enabled_individual: Cell<bool>,
media_direction: Cell<MediaDirection>,
muted: Cell<bool>,
peer_events_sender: mpsc::UnboundedSender<PeerEvent>,
track_events_sender: mpsc::UnboundedSender<TrackEvent>,
}
impl Receiver {
pub async fn new(
state: &State,
media_connections: &MediaConnections,
track_events_sender: mpsc::UnboundedSender<TrackEvent>,
recv_constraints: &RecvConstraints,
connection_mode: ConnectionMode,
) -> Self {
let caps = TrackConstraints::from(state.media_type().clone());
#[expect(clippy::if_then_some_else_none, reason = "more readable")]
let transceiver = if state.mid().is_none() {
let sender = media_connections
.0
.borrow()
.senders
.values()
.find(|sndr| {
sndr.caps().media_kind() == caps.media_kind()
&& sndr.caps().media_source_kind()
== caps.media_source_kind()
})
.map(utils::component::Component::obj);
let trnsvr = if let Some(s) = sender {
s.transceiver()
} else {
let new_transceiver =
media_connections.0.borrow().add_transceiver(
state.media_type().clone(),
platform::TransceiverDirection::INACTIVE,
);
new_transceiver.await
};
trnsvr
.set_recv(match connection_mode {
ConnectionMode::Mesh => state.enabled_individual(),
ConnectionMode::Sfu => true,
})
.await;
Some(trnsvr)
} else {
None
};
let peer_events_sender =
media_connections.0.borrow().peer_events_sender.clone();
let this = Self {
track_id: state.track_id(),
connection_mode,
caps,
sender_id: state.sender_id().clone(),
transceiver: RefCell::new(transceiver),
mid: RefCell::new(state.mid().map(ToString::to_string)),
track: RefCell::new(None),
is_track_notified: Cell::new(false),
peer_events_sender,
enabled_general: Cell::new(state.enabled_individual()),
enabled_individual: Cell::new(state.enabled_general()),
muted: Cell::new(state.muted()),
media_direction: Cell::new(state.media_direction()),
track_events_sender,
};
let enabled_in_cons = match &state.media_type() {
proto::MediaType::Audio(_) => recv_constraints.is_audio_enabled(),
proto::MediaType::Video(_) => {
recv_constraints.is_video_device_enabled()
|| recv_constraints.is_video_display_enabled()
}
};
if !enabled_in_cons {
state
.media_exchange_state_controller()
.transition_to(enabled_in_cons.into());
}
this
}
#[must_use]
pub const fn caps(&self) -> &TrackConstraints {
&self.caps
}
#[must_use]
pub fn mid(&self) -> Option<String> {
if self.mid.borrow().is_none() && self.transceiver.borrow().is_some() {
if let Some(transceiver) =
self.transceiver.borrow().as_ref().cloned()
{
drop(self.mid.replace(Some(transceiver.mid()?)));
}
}
self.mid.borrow().clone()
}
pub async fn is_receiving(&self) -> bool {
let transceiver = self.transceiver.borrow().clone();
let is_recv_direction = if let Some(trcv) = transceiver {
trcv.has_direction(platform::TransceiverDirection::RECV).await
} else {
false
};
self.enabled_individual.get() && is_recv_direction
}
pub fn send_media_exchange_state_intention(
&self,
state: media_exchange_state::Transition,
) {
_ = self.track_events_sender.unbounded_send(
TrackEvent::MediaExchangeIntention {
id: self.track_id,
enabled: matches!(
state,
media_exchange_state::Transition::Enabling(_)
),
},
);
}
pub async fn set_remote_track(
&self,
transceiver: platform::Transceiver,
new_track: platform::MediaStreamTrack,
) {
if let Some(old_track) = self.track.borrow().as_ref() {
if old_track.id() == new_track.id() {
return;
}
}
self.set_transceiver(transceiver);
let new_track = remote::Track::new(
new_track,
self.caps.media_source_kind(),
self.muted.get(),
self.media_direction.get(),
);
if let Some(prev_track) = self.track.replace(Some(new_track)) {
platform::spawn(async move {
prev_track.stop().await;
});
}
let trnscvr = self.transceiver.borrow().as_ref().cloned();
if let Some(t) = trnscvr {
t.set_recv(match self.connection_mode {
ConnectionMode::Mesh => self.enabled_individual.get(),
ConnectionMode::Sfu => true,
})
.await;
}
self.maybe_notify_track().await;
}
pub fn set_media_direction(&self, direction: MediaDirection) {
self.media_direction.set(direction);
if let Some(track) = self.track.borrow().as_ref().cloned() {
track.set_media_direction(direction);
}
}
pub fn set_transceiver(&self, transceiver: platform::Transceiver) {
if self.transceiver.borrow().is_none()
&& self.mid.borrow().as_ref() == transceiver.mid().as_ref()
{
drop(self.transceiver.replace(Some(transceiver)));
}
}
pub fn transceiver(&self) -> Option<platform::Transceiver> {
self.transceiver.borrow().clone()
}
async fn maybe_notify_track(&self) {
if self.is_track_notified.get() {
return;
}
if !self.is_receiving().await {
return;
}
if self.is_track_notified.get() {
return;
}
if let Some(track) = self.track.borrow().as_ref() {
drop(self.peer_events_sender.unbounded_send(
PeerEvent::NewRemoteTrack {
sender_id: self.sender_id.clone(),
track: track.clone(),
},
));
self.is_track_notified.set(true);
}
}
}
#[cfg(feature = "mockable")]
#[expect(clippy::allow_attributes, reason = "`#[expect]` is not considered")]
#[allow(clippy::multiple_inherent_impl, reason = "feature gated")]
impl Receiver {
#[must_use]
pub fn enabled_general(&self) -> bool {
self.enabled_general.get()
}
#[must_use]
pub fn direction(&self) -> MediaDirection {
self.media_direction.get()
}
}
impl Drop for Receiver {
fn drop(&mut self) {
let transceiver = self.transceiver.borrow_mut().take();
if let Some(transceiver) = transceiver {
platform::spawn(async move {
if !transceiver.is_stopped() {
transceiver.set_recv(false).await;
}
});
}
if let Some(recv_track) = self.track.borrow_mut().take() {
platform::spawn(recv_track.stop());
}
}
}