mod ice_candidates;
mod local_sdp;
mod tracks_repository;
mod watchers;
use std::{
cell::Cell, collections::HashSet, num::NonZeroU32, rc::Rc, time::Duration,
};
use futures::{StreamExt as _, TryFutureExt as _, future::LocalBoxFuture};
pub use local_sdp::DESCRIPTION_APPROVE_TIMEOUT;
use medea_client_api_proto::{
self as proto, IceCandidate, IceServer, NegotiationRole, PeerId as Id,
TrackId,
};
use medea_reactive::{AllProcessed, ObservableCell, ProgressableCell};
use proto::{ConnectionMode, MemberId};
use tracerr::Traced;
use self::{
ice_candidates::IceCandidates, local_sdp::LocalSdp,
tracks_repository::TracksRepository,
};
use crate::{
media::LocalTracksConstraints,
peer::{
LocalStreamUpdateCriteria, PeerConnection, UpdateLocalStreamError,
media::{receiver, sender},
},
utils::{AsProtoState, SynchronizableState, Updatable, component},
};
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum SyncPhase {
Desynced,
Syncing,
Synced,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum NegotiationPhase {
Stable,
WaitLocalSdp,
WaitLocalSdpApprove,
WaitRemoteSdp,
}
#[derive(Debug)]
pub struct State {
id: Id,
connection_mode: ConnectionMode,
senders: TracksRepository<sender::State>,
receivers: TracksRepository<receiver::State>,
force_relay: bool,
ice_servers: Vec<IceServer>,
negotiation_role: ProgressableCell<Option<NegotiationRole>>,
negotiation_phase: ObservableCell<NegotiationPhase>,
local_sdp: LocalSdp,
remote_sdp: ProgressableCell<Option<String>>,
restart_ice: Cell<bool>,
ice_candidates: IceCandidates,
network_changed: Cell<bool>,
maybe_update_local_stream: ObservableCell<bool>,
maybe_update_connections:
ObservableCell<Option<(TrackId, HashSet<MemberId>)>>,
sync_phase: ObservableCell<SyncPhase>,
stats_scrape_interval_ms: Option<NonZeroU32>,
}
impl State {
#[must_use]
pub fn new(
id: Id,
ice_servers: Vec<IceServer>,
force_relay: bool,
negotiation_role: Option<NegotiationRole>,
connection_mode: ConnectionMode,
stats_scrape_interval_ms: u32,
) -> Self {
Self {
id,
connection_mode,
senders: TracksRepository::new(),
receivers: TracksRepository::new(),
ice_servers,
force_relay,
remote_sdp: ProgressableCell::new(None),
local_sdp: LocalSdp::new(),
negotiation_role: ProgressableCell::new(negotiation_role),
negotiation_phase: ObservableCell::new(NegotiationPhase::Stable),
restart_ice: Cell::new(false),
ice_candidates: IceCandidates::new(),
network_changed: Cell::new(false),
maybe_update_local_stream: ObservableCell::new(false),
maybe_update_connections: ObservableCell::new(None),
sync_phase: ObservableCell::new(SyncPhase::Synced),
stats_scrape_interval_ms: NonZeroU32::new(stats_scrape_interval_ms),
}
}
#[must_use]
pub const fn connection_mode(&self) -> ConnectionMode {
self.connection_mode
}
#[must_use]
pub const fn id(&self) -> Id {
self.id
}
#[must_use]
pub fn stats_scrape_interval(&self) -> Option<Duration> {
self.stats_scrape_interval_ms
.map(|v| Duration::from_millis(v.get().into()))
}
#[must_use]
pub const fn ice_servers(&self) -> &Vec<IceServer> {
&self.ice_servers
}
#[must_use]
pub const fn force_relay(&self) -> bool {
self.force_relay
}
pub fn insert_sender(&self, track_id: TrackId, sender: Rc<sender::State>) {
self.senders.insert(track_id, sender);
}
pub fn insert_receiver(
&self,
track_id: TrackId,
receiver: Rc<receiver::State>,
) {
self.receivers.insert(track_id, receiver);
}
#[must_use]
pub fn get_sender(&self, track_id: TrackId) -> Option<Rc<sender::State>> {
self.senders.get(track_id)
}
#[must_use]
pub fn get_receiver(
&self,
track_id: TrackId,
) -> Option<Rc<receiver::State>> {
self.receivers.get(track_id)
}
pub fn get_send_tracks(&self) -> Vec<TrackId> {
self.senders.ids()
}
pub fn get_recv_tracks(&self) -> Vec<TrackId> {
self.receivers.ids()
}
pub fn get_tracks(&self) -> Vec<TrackId> {
self.get_send_tracks()
.into_iter()
.chain(self.get_recv_tracks())
.collect()
}
pub async fn set_negotiation_role(
&self,
negotiation_role: NegotiationRole,
) {
_ = self
.negotiation_role
.subscribe()
.any(async |val| val.is_none())
.await;
self.negotiation_role.set(Some(negotiation_role));
}
pub fn restart_ice(&self) {
self.restart_ice.set(true);
}
pub fn remove_track(&self, track_id: TrackId) {
if !self.receivers.remove(track_id) {
_ = self.senders.remove(track_id);
}
}
pub fn set_remote_sdp(&self, sdp: String) {
self.remote_sdp.set(Some(sdp));
}
pub fn add_ice_candidate(&self, ice_candidate: IceCandidate) {
self.ice_candidates.add(ice_candidate);
}
pub fn apply_local_sdp(&self, sdp: String) {
self.local_sdp.approved_set(sdp);
}
pub fn stop_timeouts(&self) {
self.local_sdp.stop_timeout();
}
pub fn resume_timeouts(&self) {
self.local_sdp.resume_timeout();
}
pub fn local_stream_update_result(
&self,
tracks_ids: HashSet<TrackId>,
) -> LocalBoxFuture<'static, Result<(), Traced<UpdateLocalStreamError>>>
{
Box::pin(
self.senders
.local_stream_update_result(tracks_ids)
.map_err(tracerr::map_from_and_wrap!()),
)
}
pub fn when_all_updated(&self) -> AllProcessed<'static> {
medea_reactive::when_all_processed(vec![
self.senders.when_updated().into(),
self.receivers.when_updated().into(),
])
}
async fn update_local_stream(
&self,
peer: &Rc<PeerConnection>,
) -> Result<(), Traced<UpdateLocalStreamError>> {
let mut criteria = LocalStreamUpdateCriteria::empty();
let senders = self.senders.get_outdated();
for s in &senders {
criteria.add(s.media_kind(), s.media_source());
}
let res = peer
.update_local_stream(criteria)
.await
.map_err(tracerr::map_from_and_wrap!())
.map(drop);
for s in senders {
if let Err(err) = res.clone() {
s.failed_local_stream_update(err);
} else {
s.local_stream_updated();
}
}
res
}
pub fn insert_track(
&self,
track: &proto::Track,
send_constraints: LocalTracksConstraints,
) {
match &track.direction {
proto::Direction::Send { receivers, mid } => {
self.senders.insert(
track.id,
Rc::new(sender::State::new(
track.id,
mid.clone(),
track.media_type.clone(),
track.media_direction,
track.muted,
receivers.clone(),
send_constraints,
self.connection_mode,
)),
);
}
proto::Direction::Recv { sender, mid } => {
self.receivers.insert(
track.id,
Rc::new(receiver::State::new(
track.id,
mid.clone(),
track.media_type.clone(),
track.media_direction,
track.muted,
sender.clone(),
self.connection_mode,
)),
);
}
}
}
pub fn when_all_senders_processed(&self) -> AllProcessed<'static> {
self.senders.when_all_processed()
}
fn when_all_receivers_processed(&self) -> AllProcessed<'static> {
self.receivers.when_all_processed()
}
pub async fn patch_track(&self, patch: proto::TrackPatchEvent) {
if let Some(receivers) = &patch.receivers {
_ = self.maybe_update_connections.when_eq(None).await;
self.maybe_update_connections
.set(Some((patch.id, receivers.clone().into_iter().collect())));
}
if let Some(sender) = self.get_sender(patch.id) {
sender.update(patch);
_ = self.maybe_update_local_stream.when_eq(false).await;
self.maybe_update_local_stream.set(true);
} else if let Some(receiver) = self.get_receiver(patch.id) {
receiver.update(&patch);
} else {
log::warn!("Cannot apply patch to `Track`: {}", patch.id.0);
}
}
#[must_use]
pub fn current_sdp_offer(&self) -> Option<String> {
self.local_sdp.current()
}
pub fn set_network_changed(&self) {
self.network_changed.set(true);
}
}
pub type Component = component::Component<State, PeerConnection>;
impl AsProtoState for State {
type Output = proto::state::Peer;
fn as_proto(&self) -> Self::Output {
Self::Output {
id: self.id,
connection_mode: self.connection_mode,
senders: self.senders.as_proto(),
receivers: self.receivers.as_proto(),
ice_candidates: self.ice_candidates.as_proto(),
force_relay: self.force_relay,
ice_servers: self.ice_servers.clone(),
negotiation_role: self.negotiation_role.get(),
local_sdp: self.local_sdp.current(),
remote_sdp: self.remote_sdp.get(),
restart_ice: self.restart_ice.get(),
stats_scrape_interval_ms: self
.stats_scrape_interval_ms
.map_or(0, NonZeroU32::get),
}
}
}
impl SynchronizableState for State {
type Input = proto::state::Peer;
fn from_proto(
input: Self::Input,
send_constraints: &LocalTracksConstraints,
) -> Self {
let state = Self::new(
input.id,
input.ice_servers,
input.force_relay,
input.negotiation_role,
input.connection_mode,
input.stats_scrape_interval_ms,
);
#[expect(clippy::iter_over_hash_type, reason = "order doesn't matter")]
for (id, sender) in input.senders {
if !sender.receivers.is_empty() {
state.senders.insert(
id,
Rc::new(sender::State::from_proto(
sender,
send_constraints,
)),
);
}
}
#[expect(clippy::iter_over_hash_type, reason = "order doesn't matter")]
for (id, receiver) in input.receivers {
state.receivers.insert(
id,
Rc::new(receiver::State::from_proto(
receiver,
send_constraints,
)),
);
}
#[expect(clippy::iter_over_hash_type, reason = "order doesn't matter")]
for ice_candidate in input.ice_candidates {
state.ice_candidates.add(ice_candidate);
}
state
}
fn apply(&self, input: Self::Input, send_cons: &LocalTracksConstraints) {
if input.negotiation_role.is_some() {
self.negotiation_role.set(input.negotiation_role);
}
if input.restart_ice {
self.restart_ice.set(true);
}
if let Some(sdp_offer) = input.local_sdp {
self.local_sdp.approved_set(sdp_offer);
} else {
self.negotiation_phase.set(NegotiationPhase::WaitLocalSdp);
}
self.remote_sdp.set(input.remote_sdp);
self.ice_candidates.apply(input.ice_candidates, send_cons);
self.senders.apply(input.senders, send_cons);
self.receivers.apply(input.receivers, send_cons);
self.sync_phase.set(SyncPhase::Synced);
}
}
impl Updatable for State {
fn when_stabilized(&self) -> AllProcessed<'static> {
medea_reactive::when_all_processed(vec![
self.senders.when_stabilized().into(),
self.receivers.when_stabilized().into(),
])
}
fn when_updated(&self) -> AllProcessed<'static> {
medea_reactive::when_all_processed(vec![
self.receivers.when_updated().into(),
self.senders.when_updated().into(),
])
}
fn connection_lost(&self) {
self.sync_phase.set(SyncPhase::Desynced);
self.senders.connection_lost();
self.receivers.connection_lost();
}
fn connection_recovered(&self) {
self.sync_phase.set(SyncPhase::Syncing);
self.senders.connection_recovered();
self.receivers.connection_recovered();
}
}
#[cfg(feature = "mockable")]
#[expect(clippy::multiple_inherent_impl, reason = "feature gated")]
impl State {
pub async fn when_remote_sdp_processed(&self) {
self.remote_sdp.when_all_processed().await;
}
pub fn reset_negotiation_role(&self) {
self.negotiation_phase.set(NegotiationPhase::Stable);
self.negotiation_role.set(None);
}
#[must_use]
pub fn negotiation_role(&self) -> Option<NegotiationRole> {
self.negotiation_role.get()
}
pub fn when_local_sdp_approve_needed(
&self,
) -> impl Future<Output = ()> + use<> {
use futures::FutureExt as _;
self.negotiation_phase
.when_eq(NegotiationPhase::WaitLocalSdpApprove)
.map(drop)
}
pub fn stabilize_all(&self) {
self.receivers.stabilize_all();
}
pub async fn when_local_sdp_updated(&self) -> Option<String> {
use futures::StreamExt as _;
self.local_sdp.subscribe().skip(1).next().await.flatten()
}
pub async fn when_all_tracks_created(&self) {
medea_reactive::when_all_processed(vec![
self.senders.when_insert_processed().into(),
self.receivers.when_insert_processed().into(),
])
.await;
}
pub fn synced(&self) {
self.senders.synced();
self.receivers.synced();
self.sync_phase.set(SyncPhase::Synced);
}
}