use std::{collections::HashMap, time::Instant};
use actix::{
fut::wrap_future, AsyncContext, Context, ContextFutureSpawner as _,
SpawnHandle,
};
use derive_more::Display;
use failure::Fail;
use futures::future::{self, FutureExt as _, LocalBoxFuture};
use medea_client_api_proto::{
CloseDescription, CloseReason, Credential, Event, MemberId, RoomId,
};
use crate::{
api::{
client::rpc_connection::{
ClosedReason, RpcConnection, RpcConnectionClosed,
},
control::{
refs::{Fid, ToEndpoint, ToMember},
MemberSpec, RoomSpec,
},
},
conf::Rpc as RpcConf,
log::prelude::*,
signalling::{
elements::{
endpoints::webrtc::{WebRtcPlayEndpoint, WebRtcPublishEndpoint},
member::MemberError,
parse_members, Member, MembersLoadError,
},
room::RoomError,
Room,
},
AppContext,
};
#[derive(Debug, Display, Fail)]
pub enum ParticipantServiceErr {
#[display(fmt = "Participant [id = {}] not found", _0)]
ParticipantNotFound(Fid<ToMember>),
#[display(fmt = "Endpoint [id = {}] not found.", _0)]
EndpointNotFound(Fid<ToEndpoint>),
MemberError(MemberError),
}
impl From<MemberError> for ParticipantServiceErr {
fn from(err: MemberError) -> Self {
Self::MemberError(err)
}
}
#[derive(Debug)]
pub struct ParticipantService {
room_id: RoomId,
members: HashMap<MemberId, Member>,
connections: HashMap<MemberId, Box<dyn RpcConnection>>,
drop_connection_tasks: HashMap<MemberId, SpawnHandle>,
rpc_conf: RpcConf,
}
impl ParticipantService {
pub fn new(
room_spec: &RoomSpec,
context: &AppContext,
) -> Result<Self, MembersLoadError> {
Ok(Self {
room_id: room_spec.id().clone(),
members: parse_members(room_spec, context.config.rpc)?,
connections: HashMap::new(),
drop_connection_tasks: HashMap::new(),
rpc_conf: context.config.rpc,
})
}
pub fn get_member_by_id(
&self,
id: &MemberId,
) -> Result<Member, ParticipantServiceErr> {
self.members.get(id).cloned().ok_or_else(|| {
ParticipantServiceErr::ParticipantNotFound(
self.get_fid_to_member(id.clone()),
)
})
}
#[inline]
#[must_use]
pub fn get_fid_to_member(&self, member_id: MemberId) -> Fid<ToMember> {
Fid::<ToMember>::new(self.room_id.clone(), member_id)
}
pub fn get_member(
&self,
id: &MemberId,
) -> Result<Member, ParticipantServiceErr> {
self.members.get(id).cloned().ok_or_else(|| {
ParticipantServiceErr::ParticipantNotFound(
self.get_fid_to_member(id.clone()),
)
})
}
#[inline]
#[must_use]
pub fn members(&self) -> HashMap<MemberId, Member> {
self.members.clone()
}
#[inline]
#[must_use]
pub fn members_ids(&self) -> Vec<MemberId> {
self.members.keys().cloned().collect()
}
pub fn get_member_by_id_and_credentials(
&self,
member_id: &MemberId,
credentials: &Credential,
) -> Result<Member, RoomError> {
#[allow(clippy::map_err_ignore)]
let member = self
.get_member_by_id(member_id)
.map_err(|_| RoomError::AuthorizationError)?;
if member.verify_credentials(credentials) {
Ok(member)
} else {
Err(RoomError::AuthorizationError)
}
}
#[inline]
#[must_use]
pub fn member_has_connection(&self, member_id: &MemberId) -> bool {
self.connections.contains_key(member_id)
&& !self.drop_connection_tasks.contains_key(member_id)
}
pub fn send_event_to_member(&self, member_id: &MemberId, event: Event) {
if let Some(conn) = self.connections.get(&member_id) {
conn.send_event(self.room_id.clone(), event);
} else {
debug!(
"Can't send event [{:?}] cause connection with Member [{}] \
does not exist.",
event, member_id
);
}
}
pub fn connection_established(
&mut self,
ctx: &mut Context<Room>,
member_id: MemberId,
conn: Box<dyn RpcConnection>,
) -> LocalBoxFuture<'static, Result<Member, ParticipantServiceErr>> {
let member = match self.get_member_by_id(&member_id) {
Err(err) => {
return Box::pin(future::err(err));
}
Ok(member) => member,
};
if let Some(mut connection) = self.connections.remove(&member_id) {
debug!("Closing old RpcConnection for member [id = {}]", member_id);
if let Some(handler) = self.drop_connection_tasks.remove(&member_id)
{
ctx.cancel_future(handler);
}
self.insert_connection(member_id, conn);
Box::pin(
connection
.close(
self.room_id.clone(),
CloseDescription::new(CloseReason::Reconnected),
)
.map(move |_| Ok(member)),
)
} else {
self.insert_connection(member_id, conn);
Box::pin(future::ok(member))
}
}
fn insert_connection(
&mut self,
member_id: MemberId,
conn: Box<dyn RpcConnection>,
) {
self.connections.insert(member_id, conn);
}
pub fn connection_lost(
&mut self,
member_id: MemberId,
ctx: &mut Context<Room>,
) {
let lost_at = Instant::now();
if let Ok(member) = self.get_member_by_id(&member_id) {
self.drop_connection_tasks.insert(
member_id.clone(),
ctx.run_later(member.get_reconnect_timeout(), move |_, ctx| {
info!(
"Member [id = {}] connection lost at {:?}.",
member_id, lost_at,
);
ctx.notify(RpcConnectionClosed {
member_id,
reason: ClosedReason::Closed { normal: false },
})
}),
);
}
}
pub fn drop_connections(
&mut self,
ctx: &mut Context<Room>,
) -> LocalBoxFuture<'static, ()> {
self.drop_connection_tasks.drain().for_each(|(_, handle)| {
ctx.cancel_future(handle);
});
let room_id = self.room_id.clone();
let close_rpc_connections =
future::join_all(self.connections.drain().fold(
Vec::new(),
|mut futs, (_, mut connection)| {
futs.push(connection.close(
room_id.clone(),
CloseDescription::new(CloseReason::Finished),
));
futs
},
));
close_rpc_connections.map(drop).boxed_local()
}
pub fn delete_member(&mut self, member_id: &MemberId) {
self.members.remove(member_id);
}
pub fn close_member_connection(
&mut self,
member_id: &MemberId,
close_reason: CloseReason,
ctx: &mut Context<Room>,
) {
if let Some(drop) = self.drop_connection_tasks.remove(member_id) {
ctx.cancel_future(drop);
}
if let Some(mut conn) = self.connections.remove(member_id) {
wrap_future::<_, Room>(conn.close(
self.room_id.clone(),
CloseDescription::new(close_reason),
))
.spawn(ctx);
}
}
pub fn insert_member(&mut self, id: MemberId, member: Member) {
self.members.insert(id, member);
}
pub fn iter_members(&self) -> impl Iterator<Item = (&MemberId, &Member)> {
self.members.iter()
}
pub fn create_member(
&mut self,
id: MemberId,
spec: &MemberSpec,
) -> Result<(), RoomError> {
if self.get_member_by_id(&id).is_ok() {
return Err(RoomError::MemberAlreadyExists(
self.get_fid_to_member(id),
));
}
let signalling_member = Member::new(
id.clone(),
spec.credentials().clone(),
self.room_id.clone(),
spec.idle_timeout().unwrap_or(self.rpc_conf.idle_timeout),
spec.reconnect_timeout()
.unwrap_or(self.rpc_conf.reconnect_timeout),
spec.ping_interval().unwrap_or(self.rpc_conf.ping_interval),
);
signalling_member.set_callback_urls(spec);
for (id, publish) in spec.publish_endpoints() {
let signalling_publish = WebRtcPublishEndpoint::new(
id.clone(),
publish.p2p,
signalling_member.downgrade(),
publish.force_relay,
publish.audio_settings,
publish.video_settings,
);
signalling_member.insert_src(signalling_publish);
}
for (id, play) in spec.play_endpoints() {
let partner_member = self.get_member(&play.src.member_id)?;
let src = partner_member
.get_src_by_id(&play.src.endpoint_id)
.ok_or_else(|| {
MemberError::EndpointNotFound(
partner_member.get_fid_to_endpoint(
play.src.endpoint_id.clone().into(),
),
)
})?;
let sink = WebRtcPlayEndpoint::new(
id.clone(),
play.src.clone(),
src.downgrade(),
signalling_member.downgrade(),
play.force_relay,
);
signalling_member.insert_sink(sink);
}
for (_, sink) in signalling_member.sinks() {
let src = sink.src();
src.add_sink(sink.downgrade());
}
self.insert_member(id, signalling_member);
Ok(())
}
}
#[cfg(test)]
mod test {
use std::time::Duration;
use crate::{
api::control::{member::Credential, pipeline::Pipeline},
conf::Conf,
};
use super::*;
pub fn empty_participants_service() -> ParticipantService {
let room_spec = RoomSpec {
id: RoomId::from("test"),
pipeline: Pipeline::new(HashMap::new()),
};
let ctx = AppContext::new(
Conf::default(),
crate::turn::new_turn_auth_service_mock(),
);
ParticipantService::new(&room_spec, &ctx).unwrap()
}
#[test]
fn use_conf_when_no_rpc_settings_in_member_spec() {
let mut members = empty_participants_service();
let test_member_spec = MemberSpec::new(
Pipeline::new(HashMap::new()),
Credential::Plain("w/e".into()),
None,
None,
None,
None,
None,
);
let test_member_id = MemberId::from("test-member");
members
.create_member(test_member_id.clone(), &test_member_spec)
.unwrap();
let test_member = members.get_member_by_id(&test_member_id).unwrap();
let default_rpc_conf = Conf::default().rpc;
assert_eq!(
test_member.get_ping_interval(),
default_rpc_conf.ping_interval
);
assert_eq!(
test_member.get_idle_timeout(),
default_rpc_conf.idle_timeout
);
assert_eq!(
test_member.get_reconnect_timeout(),
default_rpc_conf.reconnect_timeout
);
}
#[test]
fn use_rpc_settings_from_member_spec() {
let mut members = empty_participants_service();
let idle_timeout = Duration::from_secs(60);
let ping_interval = Duration::from_secs(61);
let reconnect_timeout = Duration::from_secs(62);
let test_member_spec = MemberSpec::new(
Pipeline::new(HashMap::new()),
Credential::Plain("w/e".into()),
None,
None,
Some(idle_timeout),
Some(reconnect_timeout),
Some(ping_interval),
);
let test_member_id = MemberId::from("test-member");
members
.create_member(test_member_id.clone(), &test_member_spec)
.unwrap();
let test_member = members.get_member_by_id(&test_member_id).unwrap();
assert_eq!(test_member.get_ping_interval(), ping_interval);
assert_eq!(test_member.get_idle_timeout(), idle_timeout);
assert_eq!(test_member.get_reconnect_timeout(), reconnect_timeout);
}
}