mod ice_candidates;
mod local_sdp;
mod tracks_repository;
mod watchers;
#[cfg(feature = "mockable")]
use std::future::Future;
use std::{cell::Cell, collections::HashSet, rc::Rc};
use futures::{future::LocalBoxFuture, StreamExt as _, TryFutureExt as _};
use medea_client_api_proto::{
self as proto, IceCandidate, IceServer, NegotiationRole, PeerId as Id,
TrackId,
};
use medea_reactive::{AllProcessed, ObservableCell, ProgressableCell};
use proto::{ConnectionMode, MemberId};
use tracerr::Traced;
use crate::{
media::LocalTracksConstraints,
peer::{
media::{receiver, sender},
LocalStreamUpdateCriteria, PeerConnection, UpdateLocalStreamError,
},
utils::{component, AsProtoState, SynchronizableState, Updatable},
};
use self::{
ice_candidates::IceCandidates, local_sdp::LocalSdp,
tracks_repository::TracksRepository,
};
pub use local_sdp::DESCRIPTION_APPROVE_TIMEOUT;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum SyncState {
Desynced,
Syncing,
Synced,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum NegotiationState {
Stable,
WaitLocalSdp,
WaitLocalSdpApprove,
WaitRemoteSdp,
}
#[derive(Debug)]
pub struct State {
id: Id,
connection_mode: ConnectionMode,
senders: TracksRepository<sender::State>,
receivers: TracksRepository<receiver::State>,
force_relay: bool,
ice_servers: Vec<IceServer>,
negotiation_role: ProgressableCell<Option<NegotiationRole>>,
negotiation_state: ObservableCell<NegotiationState>,
local_sdp: LocalSdp,
remote_sdp: ProgressableCell<Option<String>>,
restart_ice: Cell<bool>,
ice_candidates: IceCandidates,
maybe_update_local_stream: ObservableCell<bool>,
maybe_update_connections:
ObservableCell<Option<(TrackId, HashSet<MemberId>)>>,
sync_state: ObservableCell<SyncState>,
}
impl State {
#[must_use]
pub fn new(
id: Id,
ice_servers: Vec<IceServer>,
force_relay: bool,
negotiation_role: Option<NegotiationRole>,
connection_mode: ConnectionMode,
) -> Self {
Self {
id,
connection_mode,
senders: TracksRepository::new(),
receivers: TracksRepository::new(),
ice_servers,
force_relay,
remote_sdp: ProgressableCell::new(None),
local_sdp: LocalSdp::new(),
negotiation_role: ProgressableCell::new(negotiation_role),
negotiation_state: ObservableCell::new(NegotiationState::Stable),
restart_ice: Cell::new(false),
ice_candidates: IceCandidates::new(),
maybe_update_local_stream: ObservableCell::new(false),
maybe_update_connections: ObservableCell::new(None),
sync_state: ObservableCell::new(SyncState::Synced),
}
}
#[must_use]
pub const fn connection_mode(&self) -> ConnectionMode {
self.connection_mode
}
#[must_use]
pub const fn id(&self) -> Id {
self.id
}
#[must_use]
pub const fn ice_servers(&self) -> &Vec<IceServer> {
&self.ice_servers
}
#[must_use]
pub const fn force_relay(&self) -> bool {
self.force_relay
}
pub fn insert_sender(&self, track_id: TrackId, sender: Rc<sender::State>) {
self.senders.insert(track_id, sender);
}
pub fn insert_receiver(
&self,
track_id: TrackId,
receiver: Rc<receiver::State>,
) {
self.receivers.insert(track_id, receiver);
}
#[must_use]
pub fn get_sender(&self, track_id: TrackId) -> Option<Rc<sender::State>> {
self.senders.get(track_id)
}
#[must_use]
pub fn get_receiver(
&self,
track_id: TrackId,
) -> Option<Rc<receiver::State>> {
self.receivers.get(track_id)
}
pub fn get_send_tracks(&self) -> Vec<TrackId> {
self.senders.ids()
}
pub fn get_recv_tracks(&self) -> Vec<TrackId> {
self.receivers.ids()
}
pub async fn set_negotiation_role(
&self,
negotiation_role: NegotiationRole,
) {
_ = self
.negotiation_role
.subscribe()
.any(|val| async move { val.is_none() })
.await;
self.negotiation_role.set(Some(negotiation_role));
}
pub fn restart_ice(&self) {
self.restart_ice.set(true);
}
pub fn remove_track(&self, track_id: TrackId) {
if !self.receivers.remove(track_id) {
_ = self.senders.remove(track_id);
}
}
pub fn set_remote_sdp(&self, sdp: String) {
self.remote_sdp.set(Some(sdp));
}
pub fn add_ice_candidate(&self, ice_candidate: IceCandidate) {
self.ice_candidates.add(ice_candidate);
}
pub fn apply_local_sdp(&self, sdp: String) {
self.local_sdp.approved_set(sdp);
}
pub fn stop_timeouts(&self) {
self.local_sdp.stop_timeout();
}
pub fn resume_timeouts(&self) {
self.local_sdp.resume_timeout();
}
pub fn local_stream_update_result(
&self,
tracks_ids: HashSet<TrackId>,
) -> LocalBoxFuture<'static, Result<(), Traced<UpdateLocalStreamError>>>
{
Box::pin(
self.senders
.local_stream_update_result(tracks_ids)
.map_err(tracerr::map_from_and_wrap!()),
)
}
pub fn when_all_updated(&self) -> AllProcessed<'static> {
medea_reactive::when_all_processed(vec![
self.senders.when_updated().into(),
self.receivers.when_updated().into(),
])
}
async fn update_local_stream(
&self,
peer: &Rc<PeerConnection>,
) -> Result<(), Traced<UpdateLocalStreamError>> {
let mut criteria = LocalStreamUpdateCriteria::empty();
let senders = self.senders.get_outdated();
for s in &senders {
criteria.add(s.media_kind(), s.media_source());
}
let res = peer
.update_local_stream(criteria)
.await
.map_err(tracerr::map_from_and_wrap!())
.map(drop);
for s in senders {
if let Err(err) = res.clone() {
s.failed_local_stream_update(err);
} else {
s.local_stream_updated();
}
}
res
}
pub fn insert_track(
&self,
track: &proto::Track,
send_constraints: LocalTracksConstraints,
) {
match &track.direction {
proto::Direction::Send { receivers, mid } => {
self.senders.insert(
track.id,
Rc::new(sender::State::new(
track.id,
mid.clone(),
track.media_type.clone(),
track.media_direction,
track.muted,
receivers.clone(),
send_constraints,
self.connection_mode,
)),
);
}
proto::Direction::Recv { sender, mid } => {
self.receivers.insert(
track.id,
Rc::new(receiver::State::new(
track.id,
mid.clone(),
track.media_type.clone(),
track.media_direction,
track.muted,
sender.clone(),
self.connection_mode,
)),
);
}
}
}
pub fn when_all_senders_processed(&self) -> AllProcessed<'static> {
self.senders.when_all_processed()
}
fn when_all_receivers_processed(&self) -> AllProcessed<'static> {
self.receivers.when_all_processed()
}
pub async fn patch_track(&self, patch: proto::TrackPatchEvent) {
if let Some(receivers) = &patch.receivers {
_ = self.maybe_update_connections.when_eq(None).await;
self.maybe_update_connections
.set(Some((patch.id, receivers.clone().into_iter().collect())));
}
if let Some(sender) = self.get_sender(patch.id) {
sender.update(patch);
_ = self.maybe_update_local_stream.when_eq(false).await;
self.maybe_update_local_stream.set(true);
} else if let Some(receiver) = self.get_receiver(patch.id) {
receiver.update(&patch);
} else {
log::warn!("Cannot apply patch to `Track`: {}", patch.id.0);
}
}
#[must_use]
pub fn current_sdp_offer(&self) -> Option<String> {
self.local_sdp.current()
}
}
pub type Component = component::Component<State, PeerConnection>;
impl AsProtoState for State {
type Output = proto::state::Peer;
fn as_proto(&self) -> Self::Output {
Self::Output {
id: self.id,
connection_mode: self.connection_mode,
senders: self.senders.as_proto(),
receivers: self.receivers.as_proto(),
ice_candidates: self.ice_candidates.as_proto(),
force_relay: self.force_relay,
ice_servers: self.ice_servers.clone(),
negotiation_role: self.negotiation_role.get(),
local_sdp: self.local_sdp.current(),
remote_sdp: self.remote_sdp.get(),
restart_ice: self.restart_ice.get(),
}
}
}
impl SynchronizableState for State {
type Input = proto::state::Peer;
fn from_proto(
input: Self::Input,
send_constraints: &LocalTracksConstraints,
) -> Self {
let state = Self::new(
input.id,
input.ice_servers,
input.force_relay,
input.negotiation_role,
input.connection_mode,
);
#[expect(clippy::iter_over_hash_type, reason = "order doesn't matter")]
for (id, sender) in input.senders {
if !sender.receivers.is_empty() {
state.senders.insert(
id,
Rc::new(sender::State::from_proto(
sender,
send_constraints,
)),
);
}
}
#[expect(clippy::iter_over_hash_type, reason = "order doesn't matter")]
for (id, receiver) in input.receivers {
state.receivers.insert(
id,
Rc::new(receiver::State::from_proto(
receiver,
send_constraints,
)),
);
}
#[expect(clippy::iter_over_hash_type, reason = "order doesn't matter")]
for ice_candidate in input.ice_candidates {
state.ice_candidates.add(ice_candidate);
}
state
}
fn apply(&self, input: Self::Input, send_cons: &LocalTracksConstraints) {
if input.negotiation_role.is_some() {
self.negotiation_role.set(input.negotiation_role);
}
if input.restart_ice {
self.restart_ice.set(true);
}
if let Some(sdp_offer) = input.local_sdp {
self.local_sdp.approved_set(sdp_offer);
} else {
self.negotiation_state.set(NegotiationState::WaitLocalSdp);
}
self.remote_sdp.set(input.remote_sdp);
self.ice_candidates.apply(input.ice_candidates, send_cons);
self.senders.apply(input.senders, send_cons);
self.receivers.apply(input.receivers, send_cons);
self.sync_state.set(SyncState::Synced);
}
}
impl Updatable for State {
fn when_stabilized(&self) -> AllProcessed<'static> {
medea_reactive::when_all_processed(vec![
self.senders.when_stabilized().into(),
self.receivers.when_stabilized().into(),
])
}
fn when_updated(&self) -> AllProcessed<'static> {
medea_reactive::when_all_processed(vec![
self.receivers.when_updated().into(),
self.senders.when_updated().into(),
])
}
fn connection_lost(&self) {
self.sync_state.set(SyncState::Desynced);
self.senders.connection_lost();
self.receivers.connection_lost();
}
fn connection_recovered(&self) {
self.sync_state.set(SyncState::Syncing);
self.senders.connection_recovered();
self.receivers.connection_recovered();
}
}
#[cfg(feature = "mockable")]
#[expect(clippy::allow_attributes, reason = "`#[expect]` is not considered")]
#[allow(clippy::multiple_inherent_impl, reason = "feature gated")]
impl State {
pub async fn when_remote_sdp_processed(&self) {
self.remote_sdp.when_all_processed().await;
}
pub fn reset_negotiation_role(&self) {
self.negotiation_state.set(NegotiationState::Stable);
self.negotiation_role.set(None);
}
#[must_use]
pub fn negotiation_role(&self) -> Option<NegotiationRole> {
self.negotiation_role.get()
}
pub fn when_local_sdp_approve_needed(&self) -> impl Future<Output = ()> {
use futures::FutureExt as _;
self.negotiation_state
.when_eq(NegotiationState::WaitLocalSdpApprove)
.map(drop)
}
pub fn stabilize_all(&self) {
self.receivers.stabilize_all();
}
pub async fn when_local_sdp_updated(&self) -> Option<String> {
use futures::StreamExt as _;
self.local_sdp.subscribe().skip(1).next().await.flatten()
}
pub async fn when_all_tracks_created(&self) {
medea_reactive::when_all_processed(vec![
self.senders.when_insert_processed().into(),
self.receivers.when_insert_processed().into(),
])
.await;
}
pub fn synced(&self) {
self.senders.synced();
self.receivers.synced();
self.sync_state.set(SyncState::Synced);
}
}