use actix::{Handler, Message, StreamHandler, WeakAddr};
use chrono::{DateTime, Utc};
use medea_client_api_proto::{
ConnectionQualityScore, Event, MemberId, NegotiationRole, PeerId,
PeerUpdate,
};
use crate::{
api::control::callback::{MediaDirection, MediaType},
log::prelude::*,
media::{peer::PeerUpdatesSubscriber, Peer, PeerStateMachine, Stable},
signalling::{
peers::{
PeerConnectionStateEventsHandler, PeersMetricsEvent,
PeersMetricsEventHandler,
},
room::RoomError,
Room,
},
};
impl Room {
fn send_peer_created(&self, peer_id: PeerId) -> Result<(), RoomError> {
let peer: Peer<Stable> = self.peers.take_inner_peer(peer_id)?;
let partner_peer: Peer<Stable> =
match self.peers.take_inner_peer(peer.partner_peer_id()) {
Ok(p) => p,
Err(e) => {
self.peers.add_peer(peer);
return Err(e);
}
};
let ice_servers = if let Some(ice_servers) = peer.ice_servers_list() {
ice_servers
} else {
let member_id = peer.member_id().clone();
self.peers.add_peer(peer);
self.peers.add_peer(partner_peer);
return Err(RoomError::NoTurnCredentials(member_id));
};
let peer = peer.start_as_offerer();
let partner_peer = partner_peer.start_as_answerer();
let peer_created = Event::PeerCreated {
peer_id: peer.id(),
negotiation_role: NegotiationRole::Offerer,
tracks: peer.new_tracks(),
ice_servers,
force_relay: peer.is_force_relayed(),
};
self.members
.send_event_to_member(peer.member_id(), peer_created);
self.peers.add_peer(peer);
self.peers.add_peer(partner_peer);
Ok(())
}
}
impl PeerConnectionStateEventsHandler for WeakAddr<Room> {
fn peer_started(&self, peer_id: PeerId) {
if let Some(addr) = self.upgrade() {
addr.do_send(PeerStarted(peer_id));
}
}
fn peer_stopped(&self, peer_id: PeerId, at: DateTime<Utc>) {
if let Some(addr) = self.upgrade() {
addr.do_send(PeerStopped { peer_id, at })
}
}
}
impl StreamHandler<PeersMetricsEvent> for Room {
fn handle(&mut self, event: PeersMetricsEvent, _: &mut Self::Context) {
if let Err(err) = event.dispatch_with(self) {
error!("Error handling PeersMetricsEvent: {:?}", err);
}
}
}
impl PeersMetricsEventHandler for Room {
type Output = Result<(), RoomError>;
fn on_no_traffic_flow(
&mut self,
_: PeerId,
_: DateTime<Utc>,
_: MediaType,
_: MediaDirection,
) -> Self::Output {
Ok(())
}
fn on_traffic_flows(
&mut self,
_: PeerId,
_: MediaType,
_: MediaDirection,
) -> Self::Output {
Ok(())
}
fn on_quality_meter_update(
&mut self,
member_id: MemberId,
partner_member_id: MemberId,
quality_score: ConnectionQualityScore,
) -> Self::Output {
self.members.send_event_to_member(
&member_id,
Event::ConnectionQualityUpdated {
partner_member_id,
quality_score,
},
);
Ok(())
}
fn on_peer_connection_failed(&mut self, peer_id: PeerId) -> Self::Output {
debug!("PeerConnection failed [peer_id = {}].", peer_id);
self.peers.map_peer_by_id_mut(peer_id, |peer| {
peer.as_changes_scheduler().restart_ice();
})?;
self.peers.commit_scheduled_changes(peer_id)?;
Ok(())
}
}
#[derive(Debug, Message)]
#[rtype(result = "()")]
struct PeerStarted(pub PeerId);
#[derive(Debug, Message)]
#[rtype(result = "()")]
struct PeerStopped {
peer_id: PeerId,
at: DateTime<Utc>,
}
impl Handler<PeerStarted> for Room {
type Result = ();
fn handle(
&mut self,
_: PeerStarted,
_: &mut Self::Context,
) -> Self::Result {
}
}
impl Handler<PeerStopped> for Room {
type Result = ();
fn handle(
&mut self,
_: PeerStopped,
_: &mut Self::Context,
) -> Self::Result {
}
}
impl PeerUpdatesSubscriber for WeakAddr<Room> {
#[inline]
fn negotiation_needed(&self, peer_id: PeerId) {
if let Some(addr) = self.upgrade() {
addr.do_send(NegotiationNeeded(peer_id));
}
}
#[inline]
fn force_update(&self, peer_id: PeerId, changes: Vec<PeerUpdate>) {
if let Some(addr) = self.upgrade() {
addr.do_send(ForceUpdate(peer_id, changes));
}
}
}
#[derive(Message, Clone, Debug)]
#[rtype(result = "Result<(), RoomError>")]
pub struct ForceUpdate(PeerId, Vec<PeerUpdate>);
impl Handler<ForceUpdate> for Room {
type Result = Result<(), RoomError>;
fn handle(
&mut self,
msg: ForceUpdate,
_: &mut Self::Context,
) -> Self::Result {
self.peers.map_peer_by_id(msg.0, |peer| {
self.members.send_event_to_member(
peer.member_id(),
Event::PeerUpdated {
peer_id: msg.0,
updates: msg.1,
negotiation_role: None,
},
);
})
}
}
#[derive(Message, Clone, Debug, Copy)]
#[rtype(result = "Result<(), RoomError>")]
pub struct NegotiationNeeded(pub PeerId);
impl Handler<NegotiationNeeded> for Room {
type Result = Result<(), RoomError>;
fn handle(
&mut self,
msg: NegotiationNeeded,
_: &mut Self::Context,
) -> Self::Result {
let peer_id = msg.0;
self.peers.update_peer_tracks(peer_id)?;
let peer: Peer<Stable> =
if let Ok(peer) = self.peers.take_inner_peer(msg.0) {
peer
} else {
return Ok(());
};
let is_partner_stable = match self
.peers
.map_peer_by_id(peer.partner_peer_id(), PeerStateMachine::is_stable)
{
Ok(r) => r,
Err(e) => {
self.peers.add_peer(peer);
return Err(e);
}
};
let is_known_to_remote = peer.is_known_to_remote();
self.peers.add_peer(peer);
if is_partner_stable {
if is_known_to_remote {
self.send_tracks_applied(peer_id)
} else {
self.send_peer_created(peer_id)
}
} else {
Ok(())
}
}
}