mod component;
use std::{
cell::{Cell, RefCell},
rc::Rc,
};
use derive_more::{Display, From};
use futures::channel::mpsc;
use medea_client_api_proto::TrackId;
use tracerr::Traced;
use crate::{
media::{track::local, LocalTracksConstraints, TrackConstraints},
peer::TrackEvent,
platform,
utils::Caused,
};
use super::{
media_exchange_state, mute_state, MediaConnections,
MediaStateControllable as _,
};
#[doc(inline)]
pub use self::component::{Component, State};
#[derive(Caused, Clone, Debug, Display)]
#[cause(error = platform::Error)]
pub enum CreateError {
#[display(
"`MediaExchangeState` of `Sender` cannot transit to \
disabled state, because this `Sender` is required"
)]
CannotDisableRequiredSender,
#[display("Unable to find Transceiver with mid: {_0}")]
TransceiverNotFound(String),
}
#[derive(Caused, Clone, Debug, Display, From)]
#[cause(error = platform::Error)]
#[display("`MediaManagerHandle` is in detached state")]
pub struct InsertTrackError(platform::Error);
#[derive(Debug)]
pub struct Sender {
track_id: TrackId,
caps: TrackConstraints,
transceiver: platform::Transceiver,
track: RefCell<Option<Rc<local::Track>>>,
muted: Cell<bool>,
enabled_individual: Cell<bool>,
enabled_general: Cell<bool>,
send_constraints: LocalTracksConstraints,
track_events_sender: mpsc::UnboundedSender<TrackEvent>,
}
impl Sender {
pub async fn new(
state: &State,
media_connections: &MediaConnections,
send_constraints: LocalTracksConstraints,
track_events_sender: mpsc::UnboundedSender<TrackEvent>,
) -> Result<Rc<Self>, Traced<CreateError>> {
let enabled_in_cons = send_constraints.enabled(state.media_type());
let muted_in_cons = send_constraints.muted(state.media_type());
let media_disabled = state.is_muted()
|| !state.is_enabled_individual()
|| !enabled_in_cons
|| muted_in_cons;
if state.media_type().required() && media_disabled {
return Err(tracerr::new!(
CreateError::CannotDisableRequiredSender
));
}
let caps = TrackConstraints::from(state.media_type().clone());
let transceiver = match state.mid() {
None => {
let transceiver = media_connections
.0
.borrow()
.receivers
.values()
.find(|rcvr| {
rcvr.caps().media_kind() == caps.media_kind()
&& rcvr.caps().media_source_kind()
== caps.media_source_kind()
})
.and_then(|rcvr| rcvr.transceiver());
if let Some(trcv) = transceiver {
trcv
} else {
let add_transceiver =
media_connections.0.borrow().add_transceiver(
state.media_type().clone(),
platform::TransceiverDirection::INACTIVE,
);
add_transceiver.await
}
}
Some(mid) => {
let get_transceiver = media_connections
.0
.borrow()
.get_transceiver_by_mid(mid.into());
get_transceiver
.await
.ok_or_else(|| CreateError::TransceiverNotFound(mid.into()))
.map_err(tracerr::wrap!())?
}
};
let this = Rc::new(Self {
track_id: state.id(),
caps,
transceiver,
enabled_general: Cell::new(state.is_enabled_general()),
enabled_individual: Cell::new(state.is_enabled_individual()),
muted: Cell::new(state.is_muted()),
track_events_sender,
send_constraints,
track: RefCell::new(None),
});
state
.media_exchange_state_controller()
.transition_to(media_exchange_state::Stable::from(enabled_in_cons));
if muted_in_cons {
state
.mute_state_controller()
.transition_to(mute_state::Stable::from(muted_in_cons));
}
Ok(this)
}
pub const fn caps(&self) -> &TrackConstraints {
&self.caps
}
pub async fn is_publishing(&self) -> bool {
self.transceiver
.has_direction(platform::TransceiverDirection::SEND)
.await
}
pub async fn remove_track(&self) {
drop(self.track.take());
drop(self.transceiver.set_send_track(None).await);
}
#[must_use]
pub fn has_track(&self) -> bool {
self.track.borrow().is_some()
}
pub(super) async fn insert_track(
self: Rc<Self>,
new_track: Rc<local::Track>,
) -> Result<(), Traced<InsertTrackError>> {
if let Some(current_track) = self.track.borrow().as_ref() {
if new_track.id() == current_track.id() {
return Ok(());
}
}
let new_track = Rc::new(new_track.fork().await);
new_track.set_enabled(!self.muted.get());
self.transceiver
.set_send_track(Some(&new_track))
.await
.map_err(InsertTrackError::from)
.map_err(tracerr::wrap!())?;
new_track.set_enabled(!self.muted.get());
drop(self.track.replace(Some(new_track)));
Ok(())
}
#[must_use]
pub fn transceiver(&self) -> platform::Transceiver {
self.transceiver.clone()
}
#[must_use]
pub fn get_send_track(&self) -> Option<Rc<local::Track>> {
self.track.borrow().as_ref().cloned()
}
#[must_use]
pub fn mid(&self) -> Option<String> {
self.transceiver.mid()
}
fn enabled_in_cons(&self) -> bool {
self.send_constraints.is_track_enabled_and_constrained(
self.caps.media_kind(),
Some(self.caps.media_source_kind()),
)
}
pub fn send_media_exchange_state_intention(
&self,
state: media_exchange_state::Transition,
) {
_ = self.track_events_sender.unbounded_send(
TrackEvent::MediaExchangeIntention {
id: self.track_id,
enabled: matches!(
state,
media_exchange_state::Transition::Enabling(_)
),
},
);
}
pub fn send_mute_state_intention(&self, state: mute_state::Transition) {
_ = self.track_events_sender.unbounded_send(
TrackEvent::MuteUpdateIntention {
id: self.track_id,
muted: matches!(state, mute_state::Transition::Muting(_)),
},
);
}
}
#[cfg(feature = "mockable")]
#[expect(clippy::allow_attributes, reason = "`#[expect]` is not considered")]
#[allow(clippy::multiple_inherent_impl, reason = "feature gated")]
impl Sender {
#[must_use]
pub fn general_disabled(&self) -> bool {
!self.enabled_general.get()
}
#[must_use]
pub fn disabled(&self) -> bool {
!self.enabled_individual.get()
}
#[must_use]
pub fn muted(&self) -> bool {
self.muted.get()
}
}
impl Drop for Sender {
fn drop(&mut self) {
let transceiver = self.transceiver.clone();
platform::spawn(async move {
if !transceiver.is_stopped() {
transceiver.set_send(false).await;
drop(transceiver.set_send_track(None).await);
}
});
}
}