mod ice_candidates;
mod local_sdp;
mod tracks_repository;
mod watchers;
use std::{cell::Cell, collections::HashSet, rc::Rc};
use futures::{future::LocalBoxFuture, TryFutureExt as _};
use medea_client_api_proto::{
self as proto, IceCandidate, IceServer, NegotiationRole, PeerId as Id,
TrackId,
};
use medea_reactive::{AllProcessed, ObservableCell, ProgressableCell};
use tracerr::Traced;
use crate::{
media::LocalTracksConstraints,
peer::{
media::{receiver, sender},
LocalStreamUpdateCriteria, PeerConnection, PeerError,
},
utils::{component, AsProtoState, SynchronizableState, Updatable},
};
use self::{
ice_candidates::IceCandidates, local_sdp::LocalSdp,
tracks_repository::TracksRepository,
};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum SyncState {
Desynced,
Syncing,
Synced,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum NegotiationState {
Stable,
WaitLocalSdp,
WaitLocalSdpApprove,
WaitRemoteSdp,
}
#[derive(Debug)]
pub struct State {
id: Id,
senders: TracksRepository<sender::State>,
receivers: TracksRepository<receiver::State>,
force_relay: bool,
ice_servers: Vec<IceServer>,
negotiation_role: ObservableCell<Option<NegotiationRole>>,
negotiation_state: ObservableCell<NegotiationState>,
local_sdp: LocalSdp,
remote_sdp: ProgressableCell<Option<String>>,
restart_ice: Cell<bool>,
ice_candidates: IceCandidates,
maybe_update_local_stream: ObservableCell<bool>,
sync_state: ObservableCell<SyncState>,
}
impl State {
#[inline]
#[must_use]
pub fn new(
id: Id,
ice_servers: Vec<IceServer>,
force_relay: bool,
negotiation_role: Option<NegotiationRole>,
) -> Self {
Self {
id,
senders: TracksRepository::new(),
receivers: TracksRepository::new(),
ice_servers,
force_relay,
remote_sdp: ProgressableCell::new(None),
local_sdp: LocalSdp::new(),
negotiation_role: ObservableCell::new(negotiation_role),
negotiation_state: ObservableCell::new(NegotiationState::Stable),
restart_ice: Cell::new(false),
ice_candidates: IceCandidates::new(),
maybe_update_local_stream: ObservableCell::new(false),
sync_state: ObservableCell::new(SyncState::Synced),
}
}
#[inline]
#[must_use]
pub fn id(&self) -> Id {
self.id
}
#[inline]
#[must_use]
pub fn ice_servers(&self) -> &Vec<IceServer> {
&self.ice_servers
}
#[inline]
#[must_use]
pub fn force_relay(&self) -> bool {
self.force_relay
}
#[inline]
pub fn insert_sender(&self, track_id: TrackId, sender: Rc<sender::State>) {
self.senders.insert(track_id, sender);
}
#[inline]
pub fn insert_receiver(
&self,
track_id: TrackId,
receiver: Rc<receiver::State>,
) {
self.receivers.insert(track_id, receiver);
}
#[inline]
#[must_use]
pub fn get_sender(&self, track_id: TrackId) -> Option<Rc<sender::State>> {
self.senders.get(track_id)
}
#[inline]
#[must_use]
pub fn get_receiver(
&self,
track_id: TrackId,
) -> Option<Rc<receiver::State>> {
self.receivers.get(track_id)
}
#[inline]
pub async fn set_negotiation_role(
&self,
negotiation_role: NegotiationRole,
) {
let _ = self.negotiation_role.when(Option::is_none).await;
self.negotiation_role.set(Some(negotiation_role));
}
#[inline]
pub fn restart_ice(&self) {
self.restart_ice.set(true);
}
#[inline]
pub fn remove_track(&self, track_id: TrackId) {
if !self.receivers.remove(track_id) {
self.senders.remove(track_id);
}
}
#[inline]
pub fn set_remote_sdp(&self, sdp: String) {
self.remote_sdp.set(Some(sdp));
}
#[inline]
pub fn add_ice_candidate(&self, ice_candidate: IceCandidate) {
self.ice_candidates.add(ice_candidate);
}
#[inline]
pub fn apply_local_sdp(&self, sdp: String) {
self.local_sdp.approved_set(sdp);
}
#[inline]
pub fn stop_timeouts(&self) {
self.local_sdp.stop_timeout();
}
#[inline]
pub fn resume_timeouts(&self) {
self.local_sdp.resume_timeout();
}
pub fn local_stream_update_result(
&self,
tracks_ids: HashSet<TrackId>,
) -> LocalBoxFuture<'static, Result<(), Traced<PeerError>>> {
Box::pin(
self.senders
.local_stream_update_result(tracks_ids)
.map_err(tracerr::map_from_and_wrap!()),
)
}
#[inline]
pub fn when_all_updated(&self) -> AllProcessed<'static> {
medea_reactive::when_all_processed(vec![
self.senders.when_updated().into(),
self.receivers.when_updated().into(),
])
}
async fn update_local_stream(
&self,
peer: &Rc<PeerConnection>,
) -> Result<(), Traced<PeerError>> {
let mut criteria = LocalStreamUpdateCriteria::empty();
let senders = self.senders.get_outdated();
for s in &senders {
criteria.add(s.media_kind(), s.media_source());
}
let res = peer
.update_local_stream(criteria)
.await
.map_err(tracerr::map_from_and_wrap!())
.map(drop);
for s in senders {
if let Err(err) = res.clone() {
s.failed_local_stream_update(err);
} else {
s.local_stream_updated();
}
}
res
}
pub fn insert_track(
&self,
track: &proto::Track,
send_constraints: LocalTracksConstraints,
) -> Result<(), Traced<PeerError>> {
match &track.direction {
proto::Direction::Send { receivers, mid } => {
self.senders.insert(
track.id,
Rc::new(
sender::State::new(
track.id,
mid.clone(),
track.media_type.clone(),
receivers.clone(),
send_constraints,
)
.map_err(tracerr::map_from_and_wrap!())?,
),
);
}
proto::Direction::Recv { sender, mid } => {
self.receivers.insert(
track.id,
Rc::new(receiver::State::new(
track.id,
mid.clone(),
track.media_type.clone(),
sender.clone(),
)),
);
}
}
Ok(())
}
#[inline]
fn when_all_senders_processed(&self) -> AllProcessed<'static> {
self.senders.when_all_processed()
}
#[inline]
fn when_all_receivers_processed(&self) -> AllProcessed<'static> {
self.receivers.when_all_processed()
}
pub fn patch_track(&self, track_patch: &proto::TrackPatchEvent) {
if let Some(sender) = self.get_sender(track_patch.id) {
sender.update(track_patch);
self.maybe_update_local_stream.set(true);
} else if let Some(receiver) = self.get_receiver(track_patch.id) {
receiver.update(track_patch);
}
}
#[inline]
#[must_use]
pub fn current_sdp_offer(&self) -> Option<String> {
self.local_sdp.current()
}
}
pub type Component = component::Component<State, PeerConnection>;
impl AsProtoState for State {
type Output = proto::state::Peer;
#[inline]
fn as_proto(&self) -> Self::Output {
Self::Output {
id: self.id,
senders: self.senders.as_proto(),
receivers: self.receivers.as_proto(),
ice_candidates: self.ice_candidates.as_proto(),
force_relay: self.force_relay,
ice_servers: self.ice_servers.clone(),
negotiation_role: self.negotiation_role.get(),
local_sdp: self.local_sdp.current(),
remote_sdp: self.remote_sdp.get(),
restart_ice: self.restart_ice.get(),
}
}
}
impl SynchronizableState for State {
type Input = proto::state::Peer;
fn from_proto(
from: Self::Input,
send_cons: &LocalTracksConstraints,
) -> Self {
let state = Self::new(
from.id,
from.ice_servers,
from.force_relay,
from.negotiation_role,
);
for (id, sender) in from.senders {
state.senders.insert(
id,
Rc::new(sender::State::from_proto(sender, send_cons)),
);
}
for (id, receiver) in from.receivers {
state.receivers.insert(
id,
Rc::new(receiver::State::from_proto(receiver, send_cons)),
);
}
for ice_candidate in from.ice_candidates {
state.ice_candidates.add(ice_candidate);
}
state
}
fn apply(&self, state: Self::Input, send_cons: &LocalTracksConstraints) {
if state.negotiation_role.is_some() {
self.negotiation_role.set(state.negotiation_role);
}
if state.restart_ice {
self.restart_ice.set(true);
}
if let Some(sdp_offer) = state.local_sdp {
self.local_sdp.approved_set(sdp_offer);
}
self.remote_sdp.set(state.remote_sdp);
self.ice_candidates.apply(state.ice_candidates, send_cons);
self.senders.apply(state.senders, send_cons);
self.receivers.apply(state.receivers, send_cons);
self.sync_state.set(SyncState::Synced);
}
}
impl Updatable for State {
#[inline]
fn when_stabilized(&self) -> AllProcessed<'static> {
medea_reactive::when_all_processed(vec![
self.senders.when_stabilized().into(),
self.receivers.when_stabilized().into(),
])
}
#[inline]
fn when_updated(&self) -> AllProcessed<'static> {
medea_reactive::when_all_processed(vec![
self.receivers.when_updated().into(),
self.senders.when_updated().into(),
])
}
#[inline]
fn connection_lost(&self) {
self.sync_state.set(SyncState::Desynced);
self.senders.connection_lost();
self.receivers.connection_lost();
}
#[inline]
fn connection_recovered(&self) {
self.sync_state.set(SyncState::Syncing);
self.senders.connection_recovered();
self.receivers.connection_recovered();
}
}
#[cfg(feature = "mockable")]
impl State {
#[inline]
pub async fn when_remote_sdp_processed(&self) {
self.remote_sdp.when_all_processed().await;
}
#[inline]
pub fn reset_negotiation_role(&self) {
self.negotiation_state.set(NegotiationState::Stable);
self.negotiation_role.set(None);
}
#[inline]
#[must_use]
pub fn negotiation_role(&self) -> Option<NegotiationRole> {
self.negotiation_role.get()
}
#[inline]
pub fn when_local_sdp_approve_needed(
&self,
) -> impl std::future::Future<Output = ()> {
use futures::FutureExt as _;
self.negotiation_state
.when_eq(NegotiationState::WaitLocalSdpApprove)
.map(drop)
}
#[inline]
pub fn stabilize_all(&self) {
self.receivers.stabilize_all();
}
#[inline]
#[must_use]
pub async fn when_local_sdp_updated(&self) -> Option<String> {
use futures::StreamExt as _;
self.local_sdp.subscribe().skip(1).next().await.unwrap()
}
#[inline]
pub async fn when_all_tracks_created(&self) {
medea_reactive::when_all_processed(vec![
self.senders.when_insert_processed().into(),
self.receivers.when_insert_processed().into(),
])
.await;
}
#[inline]
pub fn synced(&self) {
self.senders.synced();
self.receivers.synced();
self.sync_state.set(SyncState::Synced);
}
}