use std::rc::Rc;
use futures::StreamExt as _;
use medea_client_api_proto as proto;
use medea_client_api_proto::{
MediaSourceKind, MediaType, MemberId, TrackId, TrackPatchEvent,
};
use medea_macro::watchers;
use medea_reactive::{
when_all_processed, AllProcessed, Guarded, ObservableCell, Processed,
ProgressableCell,
};
use crate::{
media::LocalTracksConstraints,
peer::{
component::SyncState,
media::{
transitable_state::media_exchange_state, InTransition, Result,
},
MediaExchangeState, MediaExchangeStateController,
MediaStateControllable, MuteStateController, TransceiverDirection,
TransceiverSide,
},
utils::{component, AsProtoState, SynchronizableState, Updatable},
MediaKind,
};
use super::Receiver;
pub type Component = component::Component<State, Receiver>;
#[derive(Debug)]
pub struct State {
id: TrackId,
mid: Option<String>,
media_type: MediaType,
sender_id: MemberId,
enabled_individual: Rc<MediaExchangeStateController>,
enabled_general: ProgressableCell<media_exchange_state::Stable>,
muted: ObservableCell<bool>,
sync_state: ObservableCell<SyncState>,
}
impl AsProtoState for State {
type Output = proto::state::Receiver;
#[inline]
fn as_proto(&self) -> Self::Output {
Self::Output {
id: self.id,
mid: self.mid.clone(),
media_type: self.media_type.clone(),
sender_id: self.sender_id.clone(),
enabled_individual: self.enabled_individual(),
enabled_general: self.enabled_general(),
muted: false,
}
}
}
impl SynchronizableState for State {
type Input = proto::state::Receiver;
#[inline]
fn from_proto(input: Self::Input, _: &LocalTracksConstraints) -> Self {
Self {
id: input.id,
mid: input.mid,
media_type: input.media_type,
sender_id: input.sender_id,
enabled_individual: MediaExchangeStateController::new(
input.enabled_individual.into(),
),
enabled_general: ProgressableCell::new(
input.enabled_general.into(),
),
muted: ObservableCell::new(input.muted),
sync_state: ObservableCell::new(SyncState::Synced),
}
}
fn apply(&self, input: Self::Input, _: &LocalTracksConstraints) {
let new_media_exchange_state =
media_exchange_state::Stable::from(input.enabled_individual);
let current_media_exchange_state = match self.enabled_individual.state()
{
MediaExchangeState::Transition(transition) => {
transition.into_inner()
}
MediaExchangeState::Stable(stable) => stable,
};
if current_media_exchange_state != new_media_exchange_state {
self.enabled_individual.update(new_media_exchange_state);
}
let new_general_media_exchange_state =
media_exchange_state::Stable::from(input.enabled_general);
self.enabled_general.set(new_general_media_exchange_state);
self.sync_state.set(SyncState::Synced);
}
}
impl Updatable for State {
#[inline]
fn when_stabilized(&self) -> AllProcessed<'static> {
let controller = Rc::clone(&self.enabled_individual);
when_all_processed(std::iter::once(
Processed::new(Box::new(move || {
let controller = Rc::clone(&controller);
Box::pin(async move {
controller.when_stabilized().await;
})
}))
.into(),
))
}
#[inline]
fn when_updated(&self) -> AllProcessed<'static> {
medea_reactive::when_all_processed(vec![
self.enabled_individual.when_processed().into(),
self.enabled_general.when_all_processed().into(),
])
}
#[inline]
fn connection_lost(&self) {
self.sync_state.set(SyncState::Desynced);
}
#[inline]
fn connection_recovered(&self) {
self.sync_state.set(SyncState::Syncing);
}
}
impl From<&State> for proto::state::Receiver {
#[inline]
fn from(from: &State) -> Self {
Self {
id: from.id,
mid: from.mid.clone(),
media_type: from.media_type.clone(),
sender_id: from.sender_id.clone(),
enabled_individual: from.enabled_individual(),
enabled_general: from.enabled_general(),
muted: false,
}
}
}
impl State {
#[inline]
#[must_use]
pub fn new(
id: TrackId,
mid: Option<String>,
media_type: MediaType,
sender: MemberId,
) -> Self {
Self {
id,
mid,
media_type,
sender_id: sender,
enabled_individual: MediaExchangeStateController::new(
media_exchange_state::Stable::Enabled,
),
enabled_general: ProgressableCell::new(
media_exchange_state::Stable::Enabled,
),
muted: ObservableCell::new(false),
sync_state: ObservableCell::new(SyncState::Synced),
}
}
#[inline]
#[must_use]
pub fn id(&self) -> TrackId {
self.id
}
#[inline]
#[must_use]
pub fn mid(&self) -> Option<&str> {
self.mid.as_deref()
}
#[inline]
#[must_use]
pub fn media_type(&self) -> &MediaType {
&self.media_type
}
#[inline]
#[must_use]
pub fn sender_id(&self) -> &MemberId {
&self.sender_id
}
#[inline]
#[must_use]
pub fn enabled_individual(&self) -> bool {
self.enabled_individual.enabled()
}
#[inline]
#[must_use]
pub fn enabled_general(&self) -> bool {
self.enabled_general.get() == media_exchange_state::Stable::Enabled
}
#[inline]
#[must_use]
pub fn muted(&self) -> bool {
self.muted.get()
}
pub fn update(&self, track_patch: &TrackPatchEvent) {
if self.id != track_patch.id {
return;
}
if let Some(enabled_general) = track_patch.enabled_general {
self.enabled_general.set(enabled_general.into());
}
if let Some(enabled_individual) = track_patch.enabled_individual {
self.enabled_individual.update(enabled_individual.into());
}
if let Some(muted) = track_patch.muted {
self.muted.set(muted);
}
}
}
#[watchers]
impl Component {
#[watch(self.enabled_general.subscribe())]
async fn general_media_exchange_state_changed(
receiver: Rc<Receiver>,
_: Rc<State>,
state: Guarded<media_exchange_state::Stable>,
) -> Result<()> {
let (state, _guard) = state.into_parts();
receiver
.enabled_general
.set(state == media_exchange_state::Stable::Enabled);
match state {
media_exchange_state::Stable::Disabled => {
if let Some(track) = receiver.track.borrow().as_ref() {
track.set_enabled(false);
}
if let Some(trnscvr) = receiver.transceiver.borrow().as_ref() {
trnscvr.sub_direction(TransceiverDirection::RECV);
}
}
media_exchange_state::Stable::Enabled => {
if let Some(track) = receiver.track.borrow().as_ref() {
track.set_enabled(true);
}
if let Some(trnscvr) = receiver.transceiver.borrow().as_ref() {
trnscvr.add_direction(TransceiverDirection::RECV);
}
}
}
receiver.maybe_notify_track();
Ok(())
}
#[inline]
#[watch(self.enabled_individual.subscribe_stable())]
async fn enabled_individual_stable_state_changed(
receiver: Rc<Receiver>,
_: Rc<State>,
state: media_exchange_state::Stable,
) -> Result<()> {
receiver
.enabled_individual
.set(state == media_exchange_state::Stable::Enabled);
Ok(())
}
#[inline]
#[watch(self.enabled_individual.subscribe_transition())]
async fn enabled_individual_transition_started(
receiver: Rc<Receiver>,
_: Rc<State>,
state: media_exchange_state::Transition,
) -> Result<()> {
receiver.send_media_exchange_state_intention(state);
Ok(())
}
#[inline]
#[watch(self.muted.subscribe())]
async fn mute_state_changed(
receiver: Rc<Receiver>,
_: Rc<State>,
muted: bool,
) -> Result<()> {
receiver.muted.set(muted);
if let Some(track) = receiver.track.borrow().as_ref() {
track.set_muted(muted)
}
Ok(())
}
#[watch(self.sync_state.subscribe().skip(1))]
async fn sync_state_watcher(
receiver: Rc<Receiver>,
state: Rc<State>,
sync_state: SyncState,
) -> Result<()> {
match sync_state {
SyncState::Synced => {
if let MediaExchangeState::Transition(transition) =
state.enabled_individual.state()
{
receiver.send_media_exchange_state_intention(transition);
}
state.enabled_individual.reset_transition_timeout();
}
SyncState::Desynced => {
state.enabled_individual.stop_transition_timeout();
}
SyncState::Syncing => (),
}
Ok(())
}
}
impl MediaStateControllable for State {
#[inline]
fn media_exchange_state_controller(
&self,
) -> Rc<MediaExchangeStateController> {
Rc::clone(&self.enabled_individual)
}
#[inline]
fn mute_state_controller(&self) -> Rc<MuteStateController> {
unreachable!("Receivers muting is not implemented");
}
}
impl TransceiverSide for State {
#[inline]
fn track_id(&self) -> TrackId {
self.id
}
#[inline]
fn kind(&self) -> MediaKind {
match &self.media_type {
MediaType::Audio(_) => MediaKind::Audio,
MediaType::Video(_) => MediaKind::Video,
}
}
#[inline]
fn source_kind(&self) -> MediaSourceKind {
match &self.media_type {
MediaType::Audio(_) => MediaSourceKind::Device,
MediaType::Video(video) => video.source_kind,
}
}
#[inline]
fn is_transitable(&self) -> bool {
true
}
}
#[cfg(feature = "mockable")]
impl State {
pub fn stabilize(&self) {
use crate::peer::media::InTransition as _;
if let crate::peer::MediaExchangeState::Transition(transition) =
self.enabled_individual.state()
{
self.enabled_individual.update(transition.intended());
self.enabled_general.set(transition.intended());
}
}
#[inline]
pub fn synced(&self) {
self.sync_state.set(SyncState::Synced);
}
}