use std::{
collections::{HashMap, HashSet},
convert::TryFrom,
fmt,
rc::Rc,
};
use derive_more::Display;
use failure::Fail;
use medea_client_api_proto::{
state, AudioSettings, Direction, IceCandidate, IceServer, MediaSourceKind,
MediaType, MemberId, NegotiationRole, PeerId as Id, PeerId, PeerUpdate,
Track, TrackId, TrackPatchCommand, TrackPatchEvent, VideoSettings,
};
use medea_macro::{dispatchable, enum_delegate};
use crate::{
api::control::endpoints::webrtc_publish_endpoint::PublishPolicy,
media::MediaTrack,
signalling::{
elements::endpoints::{
webrtc::WebRtcPublishEndpoint, Endpoint, WeakEndpoint,
},
peers::Counter,
},
turn::IceUser,
};
#[cfg_attr(test, mockall::automock)]
pub trait PeerUpdatesSubscriber: fmt::Debug {
fn negotiation_needed(&self, peer_id: PeerId);
fn force_update(&self, peer_id: PeerId, changes: Vec<PeerUpdate>);
}
#[cfg(test)]
impl fmt::Debug for MockPeerUpdatesSubscriber {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("MockPeerUpdatesSubscriber").finish()
}
}
#[derive(Debug, PartialEq)]
pub struct WaitLocalSdp;
#[derive(Debug, PartialEq)]
pub struct WaitRemoteSdp;
#[derive(Debug, PartialEq)]
pub struct Stable;
#[derive(Debug, Display, Fail)]
pub enum PeerError {
#[display(
fmt = "Cannot unwrap Peer from PeerStateMachine [id = {}]. Expected \
state {} was {}",
_0,
_1,
_2
)]
WrongState(Id, &'static str, String),
#[display(
fmt = "Peer is sending Track [{}] without providing its mid",
_0
)]
MidsMismatch(TrackId),
}
impl PeerError {
#[inline]
#[must_use]
pub fn new_wrong_state(
peer: &PeerStateMachine,
expected: &'static str,
) -> Self {
PeerError::WrongState(peer.id(), expected, format!("{}", peer))
}
}
#[enum_delegate(pub fn id(&self) -> Id)]
#[enum_delegate(pub fn member_id(&self) -> &MemberId)]
#[enum_delegate(pub fn partner_peer_id(&self) -> Id)]
#[enum_delegate(pub fn partner_member_id(&self) -> &MemberId)]
#[enum_delegate(pub fn local_sdp(&self) -> Option<&str>)]
#[enum_delegate(pub fn remote_sdp(&self) -> Option<&str>)]
#[enum_delegate(pub fn is_force_relayed(&self) -> bool)]
#[enum_delegate(pub fn ice_servers_list(&self) -> Option<Vec<IceServer>>)]
#[enum_delegate(pub fn set_ice_user(&mut self, ice_user: IceUser))]
#[enum_delegate(pub fn endpoints(&self) -> Vec<WeakEndpoint>)]
#[enum_delegate(pub fn add_endpoint(&mut self, endpoint: &Endpoint))]
#[enum_delegate(
pub fn receivers(&self) -> &HashMap<TrackId, Rc<MediaTrack>>
)]
#[enum_delegate(pub fn senders(&self) -> &HashMap<TrackId, Rc<MediaTrack>>)]
#[enum_delegate(
pub fn get_updates(&self) -> Vec<PeerUpdate>
)]
#[enum_delegate(pub fn as_changes_scheduler(&mut self) -> PeerChangesScheduler)]
#[enum_delegate(fn inner_force_commit_scheduled_changes(&mut self))]
#[enum_delegate(
pub fn add_ice_candidate(&mut self, ice_candidate: IceCandidate)
)]
#[enum_delegate(pub fn is_empty(&self) -> bool)]
#[enum_delegate(pub fn ice_candidates(&self) -> &HashSet<IceCandidate>)]
#[enum_delegate(pub fn is_ice_restart(&self) -> bool)]
#[enum_delegate(pub fn negotiation_role(&self) -> Option<NegotiationRole>)]
#[enum_delegate(pub fn is_known_to_remote(&self) -> bool)]
#[enum_delegate(pub fn force_commit_partner_changes(&mut self))]
#[enum_delegate(pub fn set_initialized(&mut self))]
#[derive(Debug)]
pub enum PeerStateMachine {
WaitLocalSdp(Peer<WaitLocalSdp>),
WaitRemoteSdp(Peer<WaitRemoteSdp>),
Stable(Peer<Stable>),
}
impl PeerStateMachine {
#[must_use]
fn get_senders_states(&self) -> HashMap<TrackId, state::Sender> {
self.senders()
.iter()
.map(|(id, sender)| {
(
*id,
state::Sender {
id: sender.id(),
mid: sender.mid(),
media_type: sender.media_type().clone(),
receivers: vec![self.partner_member_id().clone()],
enabled_individual: sender
.send_media_state()
.is_enabled(),
enabled_general: sender.is_enabled_general(),
muted: sender.send_media_state().is_muted(),
},
)
})
.collect()
}
#[must_use]
fn get_receivers_states(&self) -> HashMap<TrackId, state::Receiver> {
self.receivers()
.iter()
.map(|(id, receiver)| {
(
*id,
state::Receiver {
id: *id,
mid: receiver.mid(),
media_type: receiver.media_type().clone(),
sender_id: self.partner_member_id().clone(),
enabled_individual: receiver
.recv_media_state()
.is_enabled(),
enabled_general: receiver.is_enabled_general(),
muted: receiver.recv_media_state().is_muted(),
},
)
})
.collect()
}
#[must_use]
pub fn get_state(&self) -> state::Peer {
state::Peer {
id: self.id(),
senders: self.get_senders_states(),
receivers: self.get_receivers_states(),
force_relay: self.is_force_relayed(),
ice_servers: self.ice_servers_list().unwrap(),
negotiation_role: self.negotiation_role(),
local_sdp: self.local_sdp().map(ToOwned::to_owned),
remote_sdp: self.remote_sdp().map(ToOwned::to_owned),
ice_candidates: self.ice_candidates().clone(),
restart_ice: self.is_ice_restart(),
}
}
#[inline]
pub fn commit_scheduled_changes(&mut self) -> bool {
if let PeerStateMachine::Stable(this) = self {
this.commit_scheduled_changes();
true
} else {
false
}
}
#[inline]
pub fn force_commit_scheduled_changes(&mut self) {
if !self.commit_scheduled_changes() {
self.inner_force_commit_scheduled_changes();
}
}
#[inline]
#[must_use]
pub fn is_stable(&self) -> bool {
matches!(self, PeerStateMachine::Stable(_))
}
#[inline]
#[must_use]
pub fn can_forcibly_commit_partner_patches(&self) -> bool {
match &self {
PeerStateMachine::Stable(peer) => peer.context.is_known_to_remote,
PeerStateMachine::WaitLocalSdp(peer) => {
!peer.context.is_known_to_remote
}
PeerStateMachine::WaitRemoteSdp(peer) => {
peer.context.local_sdp.is_some()
&& !peer.context.is_known_to_remote
}
}
}
}
impl fmt::Display for PeerStateMachine {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
PeerStateMachine::WaitRemoteSdp(_) => write!(f, "WaitRemoteSdp"),
PeerStateMachine::WaitLocalSdp(_) => write!(f, "WaitLocalSdp"),
PeerStateMachine::Stable(_) => write!(f, "Stable"),
}
}
}
macro_rules! impl_peer_converts {
($peer_type:tt) => {
impl<'a> TryFrom<&'a PeerStateMachine> for &'a Peer<$peer_type> {
type Error = PeerError;
fn try_from(
peer: &'a PeerStateMachine,
) -> Result<Self, Self::Error> {
match peer {
PeerStateMachine::$peer_type(peer) => Ok(peer),
_ => Err(PeerError::WrongState(
peer.id(),
stringify!($peer_type),
format!("{}", peer),
)),
}
}
}
impl TryFrom<PeerStateMachine> for Peer<$peer_type> {
type Error = (PeerError, PeerStateMachine);
fn try_from(peer: PeerStateMachine) -> Result<Self, Self::Error> {
match peer {
PeerStateMachine::$peer_type(peer) => Ok(peer),
_ => Err((
PeerError::WrongState(
peer.id(),
stringify!($peer_type),
format!("{}", peer),
),
peer,
)),
}
}
}
impl From<Peer<$peer_type>> for PeerStateMachine {
fn from(peer: Peer<$peer_type>) -> Self {
PeerStateMachine::$peer_type(peer)
}
}
};
}
impl_peer_converts!(WaitLocalSdp);
impl_peer_converts!(WaitRemoteSdp);
impl_peer_converts!(Stable);
#[derive(Clone, Copy, Debug)]
enum OnNegotiationFinish {
Renegotiate,
Noop,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum InitializationState {
Done,
InProgress,
}
#[derive(Debug)]
pub struct Context {
id: Id,
member_id: MemberId,
partner_peer: Id,
partner_member: MemberId,
ice_user: Option<IceUser>,
local_sdp: Option<String>,
remote_sdp: Option<String>,
receivers: HashMap<TrackId, Rc<MediaTrack>>,
senders: HashMap<TrackId, Rc<MediaTrack>>,
is_force_relayed: bool,
endpoints: Vec<WeakEndpoint>,
is_known_to_remote: bool,
pending_peer_changes: Vec<PeerChange>,
peer_changes_queue: Vec<PeerChange>,
peer_updates_sub: Rc<dyn PeerUpdatesSubscriber>,
ice_candidates: HashSet<IceCandidate>,
ice_restart: bool,
negotiation_role: Option<NegotiationRole>,
on_negotiation_finish: OnNegotiationFinish,
initialization_state: InitializationState,
}
#[dispatchable]
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum PeerChange {
AddSendTrack(Rc<MediaTrack>),
AddRecvTrack(Rc<MediaTrack>),
RemoveTrack(TrackId),
TrackPatch(TrackPatchEvent),
PartnerTrackPatch(TrackPatchEvent),
IceRestart,
}
impl PeerChange {
#[inline]
#[must_use]
fn is_negotiation_state_agnostic(&self) -> bool {
matches!(
self,
Self::TrackPatch(TrackPatchEvent {
id: _,
enabled_individual: None,
enabled_general: None,
muted: Some(_),
})
)
}
fn as_new_track(&self, partner_member_id: MemberId) -> Option<Track> {
match self.as_peer_update(partner_member_id) {
PeerUpdate::Added(track) => Some(track),
PeerUpdate::Updated(_)
| PeerUpdate::IceRestart
| PeerUpdate::Removed(_) => None,
}
}
fn as_peer_update(&self, partner_member_id: MemberId) -> PeerUpdate {
match self {
Self::AddSendTrack(track) => PeerUpdate::Added(Track {
id: track.id(),
media_type: track.media_type().clone(),
direction: Direction::Send {
receivers: vec![partner_member_id],
mid: track.mid(),
},
}),
Self::AddRecvTrack(track) => PeerUpdate::Added(Track {
id: track.id(),
media_type: track.media_type().clone(),
direction: Direction::Recv {
sender: partner_member_id,
mid: track.mid(),
},
}),
Self::RemoveTrack(track_id) => PeerUpdate::Removed(*track_id),
Self::TrackPatch(track_patch)
| Self::PartnerTrackPatch(track_patch) => {
PeerUpdate::Updated(track_patch.clone())
}
Self::IceRestart => PeerUpdate::IceRestart,
}
}
fn can_force_apply(&self) -> bool {
match self {
Self::AddSendTrack(_)
| Self::AddRecvTrack(_)
| Self::IceRestart
| Self::RemoveTrack(_) => false,
Self::TrackPatch(_) | Self::PartnerTrackPatch(_) => true,
}
}
}
impl<T> PeerChangeHandler for Peer<T> {
type Output = PeerChange;
#[inline]
fn on_add_send_track(&mut self, track: Rc<MediaTrack>) -> Self::Output {
self.context.senders.insert(track.id(), Rc::clone(&track));
PeerChange::AddSendTrack(track)
}
#[inline]
fn on_add_recv_track(&mut self, track: Rc<MediaTrack>) -> Self::Output {
self.context.receivers.insert(track.id(), Rc::clone(&track));
PeerChange::AddRecvTrack(track)
}
fn on_track_patch(&mut self, mut patch: TrackPatchEvent) -> Self::Output {
let (track, is_tx) = if let Some(tx) = self.senders().get(&patch.id) {
(tx, true)
} else if let Some(rx) = self.receivers().get(&patch.id) {
(rx, false)
} else {
return PeerChange::TrackPatch(patch);
};
if let Some(enabled) = patch.enabled_individual {
if is_tx {
track.send_media_state().set_enabled(enabled);
} else {
track.recv_media_state().set_enabled(enabled);
}
patch.enabled_general = Some(track.is_enabled_general());
}
if let Some(muted) = patch.muted {
if is_tx {
track.send_media_state().set_muted(muted);
} else {
track.recv_media_state().set_muted(muted);
}
}
PeerChange::TrackPatch(patch)
}
fn on_partner_track_patch(
&mut self,
mut patch: TrackPatchEvent,
) -> Self::Output {
if let Some(enabled_individual) = patch.enabled_individual {
patch.enabled_individual = None;
let track = self
.senders()
.get(&patch.id)
.or_else(|| self.receivers().get(&patch.id));
if let Some(track) = track {
if enabled_individual == track.is_enabled_general() {
patch.enabled_general = Some(track.is_enabled_general());
}
}
}
PeerChange::TrackPatch(patch)
}
#[inline]
fn on_remove_track(&mut self, track_id: TrackId) -> Self::Output {
self.context.senders.remove(&track_id);
self.context.receivers.remove(&track_id);
PeerChange::RemoveTrack(track_id)
}
#[inline]
fn on_ice_restart(&mut self) -> Self::Output {
self.context.ice_restart = true;
PeerChange::IceRestart
}
}
struct TrackPatchDeduper {
result: HashMap<TrackId, TrackPatchEvent>,
whitelist: Option<HashSet<TrackId>>,
}
impl TrackPatchDeduper {
fn new() -> Self {
Self {
result: HashMap::new(),
whitelist: None,
}
}
fn with_whitelist(whitelist: HashSet<TrackId>) -> Self {
Self {
result: HashMap::new(),
whitelist: Some(whitelist),
}
}
fn drain_merge(&mut self, changes: &mut Vec<PeerChange>) {
changes.retain(|change| {
if !change.can_force_apply() {
return true;
}
let patch = match change {
PeerChange::TrackPatch(patch)
| PeerChange::PartnerTrackPatch(patch) => patch,
_ => return true,
};
if self.whitelist.is_some()
&& !self.whitelist.as_ref().unwrap().contains(&patch.id)
{
return true;
}
self.result
.entry(patch.id)
.or_insert_with(|| TrackPatchEvent::new(patch.id))
.merge(patch);
false
});
}
fn into_inner(self) -> impl Iterator<Item = PeerChange> {
self.result
.into_iter()
.map(|(_, patch)| PeerChange::TrackPatch(patch))
}
}
#[derive(Debug)]
pub struct Peer<S> {
context: Context,
state: S,
}
impl<T> Peer<T> {
#[inline]
pub fn member_id(&self) -> &MemberId {
&self.context.member_id
}
#[inline]
pub fn id(&self) -> Id {
self.context.id
}
#[inline]
pub fn partner_peer_id(&self) -> Id {
self.context.partner_peer
}
#[inline]
pub fn partner_member_id(&self) -> &MemberId {
&self.context.partner_member
}
#[inline]
#[must_use]
pub fn local_sdp(&self) -> Option<&str> {
self.context.local_sdp.as_deref()
}
#[inline]
#[must_use]
pub fn remote_sdp(&self) -> Option<&str> {
self.context.remote_sdp.as_deref()
}
pub fn get_updates(&self) -> Vec<PeerUpdate> {
self.context
.pending_peer_changes
.iter()
.map(|c| c.as_peer_update(self.partner_member_id().clone()))
.collect()
}
pub fn new_tracks(&self) -> Vec<Track> {
self.context
.pending_peer_changes
.iter()
.filter_map(|c| c.as_new_track(self.partner_member_id().clone()))
.collect()
}
#[inline]
pub fn is_sender(&self) -> bool {
!self.context.senders.is_empty()
}
#[inline]
pub fn is_force_relayed(&self) -> bool {
self.context.is_force_relayed
}
#[inline]
pub fn ice_servers_list(&self) -> Option<Vec<IceServer>> {
self.context.ice_user.as_ref().map(IceUser::servers_list)
}
#[inline]
pub fn set_ice_user(&mut self, ice_user: IceUser) {
self.context.ice_user.replace(ice_user);
}
#[inline]
pub fn endpoints(&self) -> Vec<WeakEndpoint> {
self.context.endpoints.clone()
}
pub fn add_endpoint(&mut self, endpoint: &Endpoint) {
match endpoint {
Endpoint::WebRtcPlayEndpoint(play) => {
play.set_peer_id_and_partner_peer_id(
self.id(),
self.partner_peer_id(),
);
}
Endpoint::WebRtcPublishEndpoint(publish) => {
publish.add_peer_id(self.id());
}
}
self.context.endpoints.push(endpoint.downgrade());
}
#[inline]
pub fn receivers(&self) -> &HashMap<TrackId, Rc<MediaTrack>> {
&self.context.receivers
}
#[inline]
pub fn senders(&self) -> &HashMap<TrackId, Rc<MediaTrack>> {
&self.context.senders
}
pub fn force_commit_partner_changes(&mut self) {
let mut partner_patches = Vec::new();
let mut i = 0;
while i != self.context.peer_changes_queue.len() {
if matches!(
self.context.peer_changes_queue[i],
PeerChange::PartnerTrackPatch(_)
) {
let change = self
.context
.peer_changes_queue
.remove(i)
.dispatch_with(self);
partner_patches.push(change);
} else {
i += 1;
}
}
let mut deduper = TrackPatchDeduper::with_whitelist(
partner_patches
.iter()
.filter_map(|t| match t {
PeerChange::PartnerTrackPatch(patch) => Some(patch.id),
_ => None,
})
.collect(),
);
deduper.drain_merge(&mut self.context.pending_peer_changes);
deduper.drain_merge(&mut partner_patches);
let updates: Vec<_> = deduper
.into_inner()
.map(|c| c.as_peer_update(self.partner_member_id().clone()))
.collect();
if !updates.is_empty() {
self.context
.peer_updates_sub
.force_update(self.id(), updates);
}
}
pub fn inner_force_commit_scheduled_changes(&mut self) {
let mut forcible_changes = Vec::new();
let mut filtered_changes_queue = Vec::new();
for change in std::mem::take(&mut self.context.peer_changes_queue) {
if change.can_force_apply() {
forcible_changes.push(change.dispatch_with(self));
} else {
filtered_changes_queue.push(change);
}
}
self.context.peer_changes_queue = filtered_changes_queue;
let mut deduper = TrackPatchDeduper::with_whitelist(
forcible_changes
.iter()
.filter_map(|t| match t {
PeerChange::TrackPatch(patch)
| PeerChange::PartnerTrackPatch(patch) => Some(patch.id),
_ => None,
})
.collect(),
);
deduper.drain_merge(&mut self.context.pending_peer_changes);
deduper.drain_merge(&mut forcible_changes);
let updates: Vec<_> = deduper
.into_inner()
.map(|c| c.as_peer_update(self.partner_member_id().clone()))
.collect();
if !updates.is_empty() {
if self.context.is_known_to_remote {
self.context.on_negotiation_finish =
OnNegotiationFinish::Renegotiate;
}
self.context
.peer_updates_sub
.force_update(self.id(), updates);
}
}
#[must_use]
pub fn is_known_to_remote(&self) -> bool {
self.context.is_known_to_remote
}
pub fn is_empty(&self) -> bool {
if self.context.senders.is_empty() && self.context.receivers.is_empty()
{
return true;
}
let removed_tracks: HashSet<_> = self
.context
.peer_changes_queue
.iter()
.filter_map(|change| match change {
PeerChange::RemoveTrack(track_id) => Some(*track_id),
_ => None,
})
.collect();
let peers_tracks: HashSet<_> = self
.context
.senders
.iter()
.chain(self.context.receivers.iter())
.map(|t| *t.0)
.collect();
removed_tracks == peers_tracks
}
#[inline]
#[must_use]
pub fn as_changes_scheduler(&mut self) -> PeerChangesScheduler {
PeerChangesScheduler {
context: &mut self.context,
}
}
fn dedup_pending_track_updates(&mut self) {
self.dedup_ice_restarts();
self.dedup_track_patches();
}
fn dedup_ice_restarts(&mut self) {
let pending_track_updates = &mut self.context.pending_peer_changes;
let last_ice_restart_rev_index = pending_track_updates
.iter()
.rev()
.position(|item| matches!(item, PeerChange::IceRestart));
if let Some(idx) = last_ice_restart_rev_index {
let last_ice_restart_index = pending_track_updates.len() - 1 - idx;
pending_track_updates.retain({
let mut i = 0;
move |item| {
let is_last_ice_restart = i == last_ice_restart_index;
i += 1;
is_last_ice_restart
|| !matches!(item, PeerChange::IceRestart)
}
});
}
}
fn dedup_track_patches(&mut self) {
let mut deduper = TrackPatchDeduper::new();
deduper.drain_merge(&mut self.context.pending_peer_changes);
self.context
.pending_peer_changes
.extend(deduper.into_inner());
}
#[inline]
pub fn add_ice_candidate(&mut self, ice_candidate: IceCandidate) {
self.context.ice_candidates.insert(ice_candidate);
}
#[inline]
#[must_use]
pub fn ice_candidates(&self) -> &HashSet<IceCandidate> {
&self.context.ice_candidates
}
#[inline]
#[must_use]
pub fn is_ice_restart(&self) -> bool {
self.context.ice_restart
}
#[inline]
#[must_use]
pub fn negotiation_role(&self) -> Option<NegotiationRole> {
self.context.negotiation_role.clone()
}
#[inline]
pub fn set_initialized(&mut self) {
self.context.initialization_state = InitializationState::Done;
}
}
impl Peer<WaitLocalSdp> {
#[inline]
#[must_use]
pub fn set_local_offer(self, local_sdp: String) -> Peer<WaitRemoteSdp> {
let mut context = self.context;
context.local_sdp = Some(local_sdp);
Peer {
context,
state: WaitRemoteSdp {},
}
}
#[inline]
#[must_use]
pub fn set_local_answer(self, sdp_answer: String) -> Peer<Stable> {
let mut context = self.context;
context.local_sdp = Some(sdp_answer);
let mut this = Peer {
context,
state: Stable {},
};
this.negotiation_finished();
this
}
pub fn set_mids(
&mut self,
mut mids: HashMap<TrackId, String>,
) -> Result<(), PeerError> {
let tracks = self
.context
.senders
.iter_mut()
.chain(self.context.receivers.iter_mut());
for (id, track) in tracks {
let mid = mids
.remove(&id)
.ok_or_else(|| PeerError::MidsMismatch(track.id()))?;
track.set_mid(mid)
}
Ok(())
}
pub fn update_senders_statuses(
&self,
senders_statuses: HashMap<TrackId, bool>,
) {
for (track_id, is_publishing) in senders_statuses {
if let Some(sender) = self.context.senders.get(&track_id) {
sender.set_transceiver_enabled(is_publishing);
}
}
}
}
impl Peer<WaitRemoteSdp> {
#[inline]
#[must_use]
pub fn set_remote_answer(mut self, sdp_answer: String) -> Peer<Stable> {
self.context.remote_sdp = Some(sdp_answer);
let mut peer = Peer {
context: self.context,
state: Stable {},
};
peer.negotiation_finished();
peer
}
#[inline]
#[must_use]
pub fn set_remote_offer(mut self, sdp_offer: String) -> Peer<WaitLocalSdp> {
self.context.negotiation_role =
Some(NegotiationRole::Answerer(sdp_offer.clone()));
self.context.remote_sdp = Some(sdp_offer);
Peer {
context: self.context,
state: WaitLocalSdp {},
}
}
}
impl Peer<Stable> {
pub fn new(
id: Id,
member_id: MemberId,
partner_peer: Id,
partner_member: MemberId,
is_force_relayed: bool,
peer_updates_sub: Rc<dyn PeerUpdatesSubscriber>,
) -> Self {
let context = Context {
id,
member_id,
partner_peer,
partner_member,
ice_user: None,
local_sdp: None,
remote_sdp: None,
receivers: HashMap::new(),
senders: HashMap::new(),
is_force_relayed,
endpoints: Vec::new(),
is_known_to_remote: false,
pending_peer_changes: Vec::new(),
peer_changes_queue: Vec::new(),
peer_updates_sub,
ice_candidates: HashSet::new(),
ice_restart: false,
negotiation_role: None,
on_negotiation_finish: OnNegotiationFinish::Noop,
initialization_state: InitializationState::InProgress,
};
Self {
context,
state: Stable {},
}
}
#[inline]
#[must_use]
pub fn start_as_offerer(self) -> Peer<WaitLocalSdp> {
let mut context = self.context;
context.local_sdp = None;
context.remote_sdp = None;
context.negotiation_role = Some(NegotiationRole::Offerer);
Peer {
context,
state: WaitLocalSdp {},
}
}
#[inline]
#[must_use]
pub fn start_as_answerer(self) -> Peer<WaitRemoteSdp> {
let mut context = self.context;
context.local_sdp = None;
context.remote_sdp = None;
Peer {
context,
state: WaitRemoteSdp {},
}
}
pub fn get_mids(&self) -> Result<HashMap<TrackId, String>, PeerError> {
let mut mids = HashMap::with_capacity(self.context.senders.len());
for (track_id, track) in &self.context.senders {
mids.insert(
*track_id,
track
.mid()
.ok_or_else(|| PeerError::MidsMismatch(track.id()))?,
);
}
Ok(mids)
}
fn commit_scheduled_changes(&mut self) {
if self.context.initialization_state == InitializationState::Done
&& (!self.context.peer_changes_queue.is_empty()
|| matches!(
self.context.on_negotiation_finish,
OnNegotiationFinish::Renegotiate
))
{
let mut negotiationless_changes = Vec::new();
for task in std::mem::take(&mut self.context.peer_changes_queue) {
let change = task.dispatch_with(self);
if change.is_negotiation_state_agnostic() {
negotiationless_changes.push(change);
} else {
self.context.pending_peer_changes.push(change);
}
}
self.dedup_pending_track_updates();
if self.context.pending_peer_changes.is_empty()
&& matches!(
self.context.on_negotiation_finish,
OnNegotiationFinish::Noop
)
{
self.context.peer_updates_sub.force_update(
self.id(),
negotiationless_changes
.into_iter()
.map(|c| {
c.as_peer_update(self.partner_member_id().clone())
})
.collect(),
);
} else {
self.context
.pending_peer_changes
.append(&mut negotiationless_changes);
self.context.peer_updates_sub.negotiation_needed(self.id());
self.context.on_negotiation_finish = OnNegotiationFinish::Noop;
}
}
}
fn negotiation_finished(&mut self) {
self.context.ice_restart = false;
self.context.is_known_to_remote = true;
self.context.pending_peer_changes.clear();
self.context.negotiation_role = None;
self.commit_scheduled_changes();
}
}
pub struct PeerChangesScheduler<'a> {
context: &'a mut Context,
}
impl<'a> PeerChangesScheduler<'a> {
pub fn patch_tracks(&mut self, patches: Vec<TrackPatchCommand>) {
for patch in patches {
self.schedule_change(PeerChange::TrackPatch(patch.into()));
}
}
pub fn partner_patch_tracks(&mut self, patches: Vec<TrackPatchCommand>) {
for patch in patches {
self.schedule_change(PeerChange::PartnerTrackPatch(patch.into()));
}
}
#[inline]
pub fn restart_ice(&mut self) {
self.schedule_change(PeerChange::IceRestart);
}
pub fn add_publisher(
&mut self,
src: &WebRtcPublishEndpoint,
partner_peer: &mut PeerStateMachine,
tracks_counter: &Counter<TrackId>,
) {
let audio_settings = src.audio_settings();
if audio_settings.publish_policy != PublishPolicy::Disabled {
let track_audio = Rc::new(MediaTrack::new(
tracks_counter.next_id(),
MediaType::Audio(AudioSettings {
required: audio_settings.publish_policy.required(),
}),
));
self.add_sender(Rc::clone(&track_audio));
src.add_track_id(self.context.id, track_audio.id());
partner_peer
.as_changes_scheduler()
.add_receiver(track_audio);
}
let video_settings = src.video_settings();
if video_settings.publish_policy != PublishPolicy::Disabled {
let camera_video_track = Rc::new(MediaTrack::new(
tracks_counter.next_id(),
MediaType::Video(VideoSettings {
required: video_settings.publish_policy.required(),
source_kind: MediaSourceKind::Device,
}),
));
self.add_sender(Rc::clone(&camera_video_track));
src.add_track_id(self.context.id, camera_video_track.id());
partner_peer
.as_changes_scheduler()
.add_receiver(camera_video_track);
let display_video_track = Rc::new(MediaTrack::new(
tracks_counter.next_id(),
MediaType::Video(VideoSettings {
required: false,
source_kind: MediaSourceKind::Display,
}),
));
self.add_sender(Rc::clone(&display_video_track));
src.add_track_id(self.context.id, display_video_track.id());
partner_peer
.as_changes_scheduler()
.add_receiver(display_video_track);
}
}
#[inline]
fn schedule_change(&mut self, job: PeerChange) {
self.context.peer_changes_queue.push(job);
}
#[inline]
fn add_receiver(&mut self, track: Rc<MediaTrack>) {
self.schedule_change(PeerChange::AddRecvTrack(track));
}
#[inline]
fn add_sender(&mut self, track: Rc<MediaTrack>) {
self.schedule_change(PeerChange::AddSendTrack(track));
}
pub fn remove_tracks(&mut self, track_ids: &[TrackId]) {
track_ids.iter().for_each(|id| {
let changes_indexes_to_remove: Vec<_> = self
.context
.peer_changes_queue
.iter()
.enumerate()
.filter_map(|(n, change)| match change {
PeerChange::AddSendTrack(track)
| PeerChange::AddRecvTrack(track) => {
if track.id() == *id {
Some(n)
} else {
None
}
}
_ => None,
})
.collect();
if changes_indexes_to_remove.is_empty() {
self.schedule_change(PeerChange::RemoveTrack(*id))
} else {
for remove_index in changes_indexes_to_remove {
self.context.peer_changes_queue.remove(remove_index);
}
}
});
}
}
#[cfg(test)]
pub mod tests {
use super::*;
pub fn dummy_negotiation_sub_mock() -> Rc<dyn PeerUpdatesSubscriber> {
let mut mock = MockPeerUpdatesSubscriber::new();
mock.expect_negotiation_needed().returning(drop);
Rc::new(mock)
}
pub fn test_peer_from_peer_tracks(
send_audio: u32,
send_video: u32,
recv_audio: u32,
recv_video: u32,
) -> PeerStateMachine {
let mut peer = Peer::new(
Id(1),
MemberId::from("test-member"),
Id(2),
MemberId::from("partner-member"),
false,
dummy_negotiation_sub_mock(),
);
let track_id_counter = Counter::default();
for _ in 0..send_audio {
let track_id = track_id_counter.next_id();
let track = MediaTrack::new(
track_id,
MediaType::Audio(AudioSettings { required: true }),
);
peer.context.senders.insert(track_id, Rc::new(track));
}
for _ in 0..send_video {
let track_id = track_id_counter.next_id();
let track = MediaTrack::new(
track_id,
MediaType::Video(VideoSettings {
required: true,
source_kind: MediaSourceKind::Device,
}),
);
peer.context.senders.insert(track_id, Rc::new(track));
}
for _ in 0..recv_audio {
let track_id = track_id_counter.next_id();
let track = MediaTrack::new(
track_id,
MediaType::Audio(AudioSettings { required: true }),
);
peer.context.receivers.insert(track_id, Rc::new(track));
}
for _ in 0..recv_video {
let track_id = track_id_counter.next_id();
let track = MediaTrack::new(
track_id,
MediaType::Video(VideoSettings {
required: true,
source_kind: MediaSourceKind::Device,
}),
);
peer.context.receivers.insert(track_id, Rc::new(track));
}
peer.into()
}
fn media_track(track_id: u32) -> Rc<MediaTrack> {
Rc::new(MediaTrack::new(
TrackId(track_id),
MediaType::Video(VideoSettings {
required: true,
source_kind: MediaSourceKind::Device,
}),
))
}
#[test]
fn scheduled_changes_normally_ran() {
let (tx, rx) = std::sync::mpsc::channel();
let mut negotiation_sub = MockPeerUpdatesSubscriber::new();
negotiation_sub
.expect_negotiation_needed()
.returning(move |peer_id| {
tx.send(peer_id).unwrap();
});
let mut peer = Peer::new(
PeerId(0),
MemberId::from("member-1"),
PeerId(1),
MemberId::from("member-2"),
false,
Rc::new(negotiation_sub),
);
peer.set_ice_user(IceUser::new_static(
String::new(),
String::new(),
String::new(),
));
peer.set_initialized();
peer.as_changes_scheduler().add_receiver(media_track(0));
peer.as_changes_scheduler().add_sender(media_track(1));
assert!(peer.context.senders.is_empty());
assert!(peer.context.receivers.is_empty());
peer.commit_scheduled_changes();
assert_eq!(rx.recv().unwrap(), PeerId(0));
assert_eq!(peer.context.senders.len(), 1);
assert_eq!(peer.context.receivers.len(), 1);
}
#[test]
fn scheduled_changes_will_be_ran_on_stable() {
let (tx, rx) = std::sync::mpsc::channel();
let mut negotiation_sub = MockPeerUpdatesSubscriber::new();
negotiation_sub
.expect_negotiation_needed()
.returning(move |peer_id| {
tx.send(peer_id).unwrap();
});
let mut peer = Peer::new(
PeerId(0),
MemberId::from("member-1"),
PeerId(1),
MemberId::from("member-2"),
false,
Rc::new(negotiation_sub),
);
peer.set_ice_user(IceUser::new_static(
String::new(),
String::new(),
String::new(),
));
peer.set_initialized();
let mut peer = peer.start_as_offerer();
peer.as_changes_scheduler().add_sender(media_track(0));
peer.as_changes_scheduler().add_receiver(media_track(1));
assert!(peer.context.senders.is_empty());
assert!(peer.context.receivers.is_empty());
let peer = peer.set_local_offer(String::new());
assert!(peer.context.senders.is_empty());
assert!(peer.context.receivers.is_empty());
let peer = peer.set_remote_answer(String::new());
assert_eq!(peer.context.receivers.len(), 1);
assert_eq!(peer.context.senders.len(), 1);
assert_eq!(peer.context.pending_peer_changes.len(), 2);
assert_eq!(peer.context.peer_changes_queue.len(), 0);
assert_eq!(rx.recv().unwrap(), PeerId(0));
}
#[test]
fn force_updates_works() {
let (force_update_tx, force_update_rx) = std::sync::mpsc::channel();
let mut negotiation_sub = MockPeerUpdatesSubscriber::new();
negotiation_sub.expect_force_update().returning(
move |peer_id: PeerId, changes: Vec<PeerUpdate>| {
force_update_tx.send((peer_id, changes)).unwrap();
},
);
let (negotiation_needed_tx, negotiation_needed_rx) =
std::sync::mpsc::channel();
negotiation_sub.expect_negotiation_needed().returning(
move |peer_id: PeerId| {
negotiation_needed_tx.send(peer_id).unwrap();
},
);
let mut peer = Peer::new(
PeerId(0),
MemberId::from("member-1"),
PeerId(1),
MemberId::from("member-2"),
false,
Rc::new(negotiation_sub),
);
peer.set_ice_user(IceUser::new_static(
String::new(),
String::new(),
String::new(),
));
peer.set_initialized();
peer.context.is_known_to_remote = true;
peer.as_changes_scheduler().add_sender(media_track(0));
peer.as_changes_scheduler().add_receiver(media_track(1));
peer.commit_scheduled_changes();
let peer_id = negotiation_needed_rx.recv().unwrap();
assert_eq!(peer_id, PeerId(0));
let mut peer = peer.start_as_offerer();
peer.as_changes_scheduler().patch_tracks(vec![
TrackPatchCommand {
id: TrackId(0),
enabled: Some(false),
muted: None,
},
TrackPatchCommand {
id: TrackId(1),
enabled: Some(false),
muted: None,
},
]);
peer.inner_force_commit_scheduled_changes();
let (peer_id, changes) = force_update_rx.recv().unwrap();
assert_eq!(peer_id, PeerId(0));
assert_eq!(changes.len(), 2);
assert!(peer.context.peer_changes_queue.is_empty());
assert!(matches!(
peer.context.on_negotiation_finish,
OnNegotiationFinish::Renegotiate
));
let peer = peer.set_local_offer(String::new());
let _ = peer.set_remote_answer(String::new());
let peer_id = negotiation_needed_rx.recv().unwrap();
assert_eq!(peer_id, PeerId(0));
}
#[test]
fn track_patch_dedup_works() {
let mut negotiation_sub = MockPeerUpdatesSubscriber::new();
negotiation_sub
.expect_force_update()
.returning(move |_: PeerId, _: Vec<PeerUpdate>| {});
negotiation_sub
.expect_negotiation_needed()
.returning(move |_: PeerId| {});
let mut peer = Peer::new(
PeerId(0),
MemberId::from("member-1"),
PeerId(1),
MemberId::from("member-2"),
false,
Rc::new(negotiation_sub),
);
let patches = vec![
TrackPatchCommand {
id: TrackId(1),
enabled: Some(false),
muted: None,
},
TrackPatchCommand {
id: TrackId(2),
enabled: None,
muted: None,
},
TrackPatchCommand {
id: TrackId(1),
enabled: Some(true),
muted: None,
},
TrackPatchCommand {
id: TrackId(2),
enabled: Some(false),
muted: None,
},
TrackPatchCommand {
id: TrackId(2),
enabled: Some(true),
muted: None,
},
TrackPatchCommand {
id: TrackId(2),
enabled: None,
muted: None,
},
TrackPatchCommand {
id: TrackId(1),
enabled: None,
muted: None,
},
];
peer.as_changes_scheduler().patch_tracks(patches);
let mut peer = PeerStateMachine::from(peer);
peer.set_ice_user(IceUser::new_static(
String::new(),
String::new(),
String::new(),
));
peer.set_initialized();
peer.commit_scheduled_changes();
let peer = if let PeerStateMachine::Stable(peer) = peer {
peer
} else {
unreachable!("Peer should be in Stable state.");
};
let mut track_patches_after: Vec<_> = peer
.context
.pending_peer_changes
.iter()
.filter_map(|t| {
if let PeerChange::TrackPatch(patch) = t {
Some(patch.clone())
} else {
None
}
})
.collect();
let second_track_patch = track_patches_after.pop().unwrap();
assert_eq!(second_track_patch.enabled_individual, Some(true));
let first_track_patch = track_patches_after.pop().unwrap();
assert_eq!(first_track_patch.enabled_general, None);
assert!(track_patches_after.is_empty());
}
#[test]
fn ice_restart_dedupping_works() {
let changes = vec![
PeerChange::IceRestart,
PeerChange::IceRestart,
PeerChange::IceRestart,
PeerChange::TrackPatch(TrackPatchEvent {
id: TrackId(0),
enabled_individual: None,
enabled_general: None,
muted: None,
}),
PeerChange::IceRestart,
PeerChange::TrackPatch(TrackPatchEvent {
id: TrackId(0),
enabled_individual: None,
enabled_general: None,
muted: None,
}),
];
let mut negotiation_sub = MockPeerUpdatesSubscriber::new();
negotiation_sub
.expect_force_update()
.returning(move |_: PeerId, _: Vec<PeerUpdate>| {});
negotiation_sub
.expect_negotiation_needed()
.returning(move |_: PeerId| {});
let mut peer = Peer::new(
PeerId(0),
MemberId::from("member-1"),
PeerId(1),
MemberId::from("member-2"),
false,
Rc::new(negotiation_sub),
);
peer.context.pending_peer_changes = changes;
peer.dedup_ice_restarts();
let deduped_track_updates = peer.context.pending_peer_changes;
assert_eq!(deduped_track_updates.len(), 3);
assert!(matches!(deduped_track_updates[1], PeerChange::IceRestart));
}
#[test]
fn force_update_dedups_normally() {
let mut peer_updates_sub = MockPeerUpdatesSubscriber::new();
peer_updates_sub.expect_force_update().times(1).returning(
|peer_id, changes| {
assert_eq!(peer_id, PeerId(0));
assert_eq!(changes.len(), 1);
if let PeerUpdate::Updated(patch) = &changes[0] {
assert_eq!(patch.id, TrackId(0));
assert_eq!(patch.enabled_individual, Some(false));
} else {
unreachable!();
}
},
);
let mut peer = Peer::new(
PeerId(0),
MemberId::from("alice"),
PeerId(1),
MemberId::from("bob"),
false,
Rc::new(peer_updates_sub),
);
peer.context.pending_peer_changes = vec![
PeerChange::TrackPatch(TrackPatchEvent {
id: TrackId(0),
enabled_general: Some(false),
enabled_individual: Some(false),
muted: None,
}),
PeerChange::TrackPatch(TrackPatchEvent {
id: TrackId(0),
enabled_general: Some(true),
enabled_individual: Some(true),
muted: None,
}),
PeerChange::TrackPatch(TrackPatchEvent {
id: TrackId(1),
enabled_general: Some(false),
enabled_individual: Some(false),
muted: None,
}),
];
peer.as_changes_scheduler().patch_tracks(vec![
TrackPatchCommand {
id: TrackId(0),
enabled: Some(false),
muted: None,
},
TrackPatchCommand {
id: TrackId(0),
enabled: Some(true),
muted: None,
},
TrackPatchCommand {
id: TrackId(0),
enabled: Some(false),
muted: None,
},
]);
peer.inner_force_commit_scheduled_changes();
assert_eq!(peer.context.peer_changes_queue.len(), 0);
assert_eq!(peer.context.pending_peer_changes.len(), 1);
let filtered_track_change =
peer.context.pending_peer_changes.pop().unwrap();
if let PeerChange::TrackPatch(patch) = filtered_track_change {
assert_eq!(patch.id, TrackId(1));
assert_eq!(patch.enabled_general, Some(false));
} else {
unreachable!();
}
}
mod track_patch_deduper {
use super::*;
#[test]
fn whitelisting_works() {
let mut deduper =
TrackPatchDeduper::with_whitelist(hashset![TrackId(1)]);
let filtered_patch = PeerChange::TrackPatch(TrackPatchEvent {
id: TrackId(2),
enabled_general: Some(false),
enabled_individual: Some(false),
muted: None,
});
let whitelisted_patch = PeerChange::TrackPatch(TrackPatchEvent {
id: TrackId(1),
enabled_general: Some(false),
enabled_individual: Some(false),
muted: None,
});
let mut patches =
vec![whitelisted_patch.clone(), filtered_patch.clone()];
deduper.drain_merge(&mut patches);
assert_eq!(patches.len(), 1);
assert_eq!(patches[0], filtered_patch);
let merged_changes: Vec<_> = deduper.into_inner().collect();
assert_eq!(merged_changes.len(), 1);
assert_eq!(merged_changes[0], whitelisted_patch);
}
#[test]
fn merging_works() {
let mut deduper = TrackPatchDeduper::new();
let mut changes: Vec<_> = vec![
TrackPatchEvent {
id: TrackId(1),
enabled_general: Some(true),
enabled_individual: Some(true),
muted: None,
},
TrackPatchEvent {
id: TrackId(2),
enabled_general: Some(false),
enabled_individual: Some(false),
muted: None,
},
TrackPatchEvent {
id: TrackId(1),
enabled_general: Some(false),
enabled_individual: Some(false),
muted: None,
},
TrackPatchEvent {
id: TrackId(1),
enabled_general: None,
enabled_individual: None,
muted: None,
},
TrackPatchEvent {
id: TrackId(2),
enabled_general: Some(true),
enabled_individual: Some(true),
muted: None,
},
]
.into_iter()
.map(|p| PeerChange::TrackPatch(p))
.collect();
let unrelated_change =
PeerChange::AddSendTrack(Rc::new(MediaTrack::new(
TrackId(1),
MediaType::Audio(AudioSettings { required: true }),
)));
changes.push(unrelated_change.clone());
deduper.drain_merge(&mut changes);
assert_eq!(changes.len(), 1);
assert_eq!(changes[0], unrelated_change);
let merged_changes: HashMap<_, _> = deduper
.into_inner()
.filter_map(|t| {
if let PeerChange::TrackPatch(patch) = t {
Some((patch.id, patch))
} else {
None
}
})
.collect();
assert_eq!(merged_changes.len(), 2);
{
let track_1 = merged_changes.get(&TrackId(1)).unwrap();
assert_eq!(track_1.enabled_general, Some(false));
}
{
let track_2 = merged_changes.get(&TrackId(2)).unwrap();
assert_eq!(track_2.enabled_general, Some(true));
}
}
}
mod negotiation_state_agnostic_patches {
use super::*;
#[test]
fn negotiation_state_agnostic_change_dont_trigger_negotiation() {
let track_patch = TrackPatchEvent {
id: TrackId(0),
muted: Some(true),
enabled_individual: None,
enabled_general: None,
};
let changes = vec![PeerChange::TrackPatch(track_patch.clone())];
let mut negotiation_sub = MockPeerUpdatesSubscriber::new();
negotiation_sub.expect_force_update().times(1).return_once(
move |peer_id: PeerId, updates: Vec<PeerUpdate>| {
assert_eq!(peer_id, PeerId(0));
assert_eq!(updates, vec![PeerUpdate::Updated(track_patch)]);
},
);
let mut peer = Peer::new(
PeerId(0),
MemberId::from("member-1"),
PeerId(1),
MemberId::from("member-2"),
false,
Rc::new(negotiation_sub),
);
peer.set_ice_user(IceUser::new_static(
String::new(),
String::new(),
String::new(),
));
peer.set_initialized();
peer.context.peer_changes_queue = changes;
peer.commit_scheduled_changes();
}
#[test]
fn mixed_changeset_will_trigger_negotiation() {
let changes = vec![
PeerChange::TrackPatch(TrackPatchEvent {
id: TrackId(0),
muted: Some(true),
enabled_individual: None,
enabled_general: None,
}),
PeerChange::TrackPatch(TrackPatchEvent {
id: TrackId(1),
muted: None,
enabled_individual: Some(true),
enabled_general: Some(true),
}),
];
let mut negotiation_sub = MockPeerUpdatesSubscriber::new();
negotiation_sub
.expect_negotiation_needed()
.times(1)
.return_once(move |peer_id: PeerId| {
assert_eq!(peer_id, PeerId(0));
});
let mut peer = Peer::new(
PeerId(0),
MemberId::from("member-1"),
PeerId(1),
MemberId::from("member-2"),
false,
Rc::new(negotiation_sub),
);
peer.set_ice_user(IceUser::new_static(
String::new(),
String::new(),
String::new(),
));
peer.set_initialized();
peer.context.peer_changes_queue = changes.clone();
peer.commit_scheduled_changes();
let reversed_changes = {
let mut changes = changes;
changes.reverse();
changes
};
assert_eq!(peer.context.pending_peer_changes, reversed_changes);
}
#[test]
fn mixed_change_will_trigger_negotiation() {
let changes = vec![PeerChange::TrackPatch(TrackPatchEvent {
id: TrackId(0),
muted: Some(true),
enabled_individual: Some(true),
enabled_general: Some(true),
})];
let mut negotiation_sub = MockPeerUpdatesSubscriber::new();
negotiation_sub
.expect_negotiation_needed()
.times(1)
.return_once(move |peer_id: PeerId| {
assert_eq!(peer_id, PeerId(0));
});
let mut peer = Peer::new(
PeerId(0),
MemberId::from("member-1"),
PeerId(1),
MemberId::from("member-2"),
false,
Rc::new(negotiation_sub),
);
peer.set_ice_user(IceUser::new_static(
String::new(),
String::new(),
String::new(),
));
peer.set_initialized();
peer.context.peer_changes_queue = changes.clone();
peer.commit_scheduled_changes();
assert_eq!(peer.context.pending_peer_changes, changes);
}
}
mod state_generation {
use std::convert::TryInto;
use super::*;
fn peer() -> Peer<Stable> {
let mut negotiation_sub = MockPeerUpdatesSubscriber::new();
negotiation_sub
.expect_negotiation_needed()
.returning(|_: PeerId| ());
negotiation_sub
.expect_force_update()
.returning(|_: PeerId, _: Vec<PeerUpdate>| ());
let mut peer = Peer::new(
PeerId(0),
MemberId::from("member-1"),
PeerId(0),
MemberId::from("member-2"),
false,
Rc::new(negotiation_sub),
);
peer.set_ice_user(IceUser::new_static(
String::new(),
String::new(),
String::new(),
));
peer.set_initialized();
peer
}
#[test]
fn offerer_role() {
let peer = peer();
let peer = peer.start_as_offerer();
let peer = PeerStateMachine::from(peer);
let state = peer.get_state();
assert_eq!(state.negotiation_role, Some(NegotiationRole::Offerer));
let peer: Peer<WaitLocalSdp> = peer.try_into().unwrap();
let peer = peer.set_local_offer(String::new());
let peer = PeerStateMachine::from(peer);
let state = peer.get_state();
assert_eq!(state.negotiation_role, Some(NegotiationRole::Offerer));
let peer: Peer<WaitRemoteSdp> = peer.try_into().unwrap();
let peer = peer.set_remote_answer(String::new());
let peer = PeerStateMachine::from(peer);
let state = peer.get_state();
assert_eq!(state.negotiation_role, None);
}
#[test]
fn answerer_role() {
let peer = peer();
let peer = peer.start_as_answerer();
let peer = PeerStateMachine::from(peer);
let state = peer.get_state();
assert_eq!(state.negotiation_role, None);
let peer: Peer<WaitRemoteSdp> = peer.try_into().unwrap();
let peer = peer.set_remote_offer(String::from("SDP"));
let peer = PeerStateMachine::from(peer);
let state = peer.get_state();
assert_eq!(
state.negotiation_role,
Some(NegotiationRole::Answerer(String::from("SDP")))
);
let peer: Peer<WaitLocalSdp> = peer.try_into().unwrap();
let peer = peer.set_local_answer(String::new());
let peer = PeerStateMachine::from(peer);
let state = peer.get_state();
assert_eq!(state.negotiation_role, None);
}
#[test]
fn ice_restart() {
let mut peer = peer();
peer.as_changes_scheduler().restart_ice();
peer.commit_scheduled_changes();
let peer = PeerStateMachine::from(peer);
let state = peer.get_state();
assert!(state.restart_ice);
}
#[test]
fn sender_patch() {
let mut peer = peer();
peer.context.senders.insert(
TrackId(0),
Rc::new(MediaTrack::new(
TrackId(0),
MediaType::Audio(AudioSettings { required: true }),
)),
);
peer.as_changes_scheduler()
.patch_tracks(vec![TrackPatchCommand {
id: TrackId(0),
muted: Some(true),
enabled: Some(false),
}]);
peer.commit_scheduled_changes();
let peer = PeerStateMachine::from(peer);
let state = peer.get_state();
let track_state = state.senders.get(&TrackId(0)).unwrap();
assert_eq!(track_state.muted, true);
assert_eq!(track_state.enabled_general, false);
assert_eq!(track_state.enabled_individual, false);
}
#[test]
fn receiver_patch() {
let mut peer = peer();
peer.context.receivers.insert(
TrackId(0),
Rc::new(MediaTrack::new(
TrackId(0),
MediaType::Audio(AudioSettings { required: true }),
)),
);
peer.as_changes_scheduler()
.patch_tracks(vec![TrackPatchCommand {
id: TrackId(0),
muted: Some(true),
enabled: Some(false),
}]);
peer.commit_scheduled_changes();
let peer = PeerStateMachine::from(peer);
let state = peer.get_state();
let track_state = state.receivers.get(&TrackId(0)).unwrap();
assert_eq!(track_state.muted, true);
assert_eq!(track_state.enabled_general, false);
assert_eq!(track_state.enabled_individual, false);
}
}
}