use std::{
cell::RefCell,
collections::HashMap,
convert::TryFrom as _,
rc::{Rc, Weak},
time::Duration,
};
use derive_more::Display;
use failure::Fail;
use medea_client_api_proto::{self as client_proto, MemberId, PeerId, RoomId};
use medea_control_api_proto::grpc::api as proto;
use crate::{
api::control::{
callback::url::CallbackUrl,
member::Credential,
refs::{Fid, StatefulFid, ToEndpoint, ToMember, ToRoom},
EndpointId, MemberSpec, RoomSpec, TryFromElementError, WebRtcPlayId,
WebRtcPublishId,
},
conf,
log::prelude::*,
};
use super::endpoints::{
webrtc::{WebRtcPlayEndpoint, WebRtcPublishEndpoint},
Endpoint,
};
#[derive(Debug, Display, Fail)]
pub enum MembersLoadError {
#[display(fmt = "TryFromElementError: {}", _0)]
TryFromError(TryFromElementError, StatefulFid),
#[display(fmt = "Member [id = {}] not found", _0)]
MemberNotFound(Fid<ToMember>),
#[display(
fmt = "Endpoint [id = {}] was referenced but not found in spec",
_0
)]
EndpointNotFound(String),
}
#[allow(clippy::pub_enum_variant_names)]
#[derive(Debug, Fail, Display)]
pub enum MemberError {
#[display(fmt = "Endpoint [id = {}] not found.", _0)]
EndpointNotFound(Fid<ToEndpoint>),
}
#[derive(Clone, Debug)]
pub struct Member(Rc<RefCell<MemberInner>>);
#[derive(Debug)]
struct MemberInner {
room_id: RoomId,
id: MemberId,
srcs: HashMap<WebRtcPublishId, WebRtcPublishEndpoint>,
sinks: HashMap<WebRtcPlayId, WebRtcPlayEndpoint>,
credentials: Credential,
on_join: Option<CallbackUrl>,
on_leave: Option<CallbackUrl>,
idle_timeout: Duration,
reconnect_timeout: Duration,
ping_interval: Duration,
}
impl Member {
#[inline]
#[must_use]
pub fn new(
id: MemberId,
credentials: Credential,
room_id: RoomId,
idle_timeout: Duration,
reconnect_timeout: Duration,
ping_interval: Duration,
) -> Self {
Self(Rc::new(RefCell::new(MemberInner {
id,
srcs: HashMap::new(),
sinks: HashMap::new(),
credentials,
room_id,
on_leave: None,
on_join: None,
idle_timeout,
reconnect_timeout,
ping_interval,
})))
}
fn get_member_from_room_spec(
&self,
room_spec: &RoomSpec,
member_id: &MemberId,
) -> Result<MemberSpec, MembersLoadError> {
let element = room_spec.pipeline.get(member_id).ok_or_else(|| {
MembersLoadError::MemberNotFound(Fid::<ToMember>::new(
self.room_id(),
member_id.clone(),
))
})?;
MemberSpec::try_from(element).map_err(|e| {
MembersLoadError::TryFromError(
e,
Fid::<ToMember>::new(self.room_id(), member_id.clone()).into(),
)
})
}
fn load(
&self,
room_spec: &RoomSpec,
store: &HashMap<MemberId, Self>,
) -> Result<(), MembersLoadError> {
let self_id = self.id();
let this_member_spec =
self.get_member_from_room_spec(room_spec, &self_id)?;
let this_member = store
.get(&self.id())
.ok_or_else(|| MembersLoadError::MemberNotFound(self.get_fid()))?;
this_member.set_callback_urls(&this_member_spec);
for (spec_play_name, spec_play_endpoint) in
this_member_spec.play_endpoints()
{
let publisher_id =
MemberId(spec_play_endpoint.src.member_id.to_string());
let publisher_member =
store.get(&publisher_id).ok_or_else(|| {
MembersLoadError::MemberNotFound(Fid::<ToMember>::new(
self.room_id(),
publisher_id,
))
})?;
let publisher_spec = self.get_member_from_room_spec(
room_spec,
&spec_play_endpoint.src.member_id,
)?;
let publisher_endpoint = publisher_spec
.get_publish_endpoint_by_id(
spec_play_endpoint.src.endpoint_id.clone(),
)
.ok_or_else(|| {
MembersLoadError::EndpointNotFound(
spec_play_endpoint.src.endpoint_id.to_string(),
)
})?;
if let Some(publisher) = publisher_member.get_src_by_id(
&spec_play_endpoint.src.endpoint_id.to_string().into(),
) {
let new_play_endpoint = WebRtcPlayEndpoint::new(
spec_play_name,
spec_play_endpoint.src.clone(),
publisher.downgrade(),
this_member.downgrade(),
spec_play_endpoint.force_relay,
);
self.insert_sink(new_play_endpoint.clone());
publisher.add_sink(new_play_endpoint.downgrade());
} else {
let new_publish_id = spec_play_endpoint.src.endpoint_id.clone();
let new_publish = WebRtcPublishEndpoint::new(
new_publish_id,
publisher_endpoint.p2p,
publisher_member.downgrade(),
publisher_endpoint.force_relay,
publisher_endpoint.audio_settings,
publisher_endpoint.video_settings,
);
let new_self_play = WebRtcPlayEndpoint::new(
spec_play_name,
spec_play_endpoint.src.clone(),
new_publish.downgrade(),
this_member.downgrade(),
spec_play_endpoint.force_relay,
);
new_publish.add_sink(new_self_play.downgrade());
publisher_member.insert_src(new_publish);
self.insert_sink(new_self_play);
}
}
this_member_spec
.publish_endpoints()
.filter(|(endpoint_id, _)| self.srcs().get(endpoint_id).is_none())
.for_each(|(endpoint_id, e)| {
self.insert_src(WebRtcPublishEndpoint::new(
endpoint_id,
e.p2p,
this_member.downgrade(),
e.force_relay,
e.audio_settings,
e.video_settings,
));
});
Ok(())
}
#[inline]
#[must_use]
pub fn get_fid(&self) -> Fid<ToMember> {
Fid::<ToMember>::new(self.room_id(), self.id())
}
#[inline]
#[must_use]
pub fn get_fid_to_endpoint(
&self,
endpoint_id: EndpointId,
) -> Fid<ToEndpoint> {
Fid::<ToEndpoint>::new(self.room_id(), self.id(), endpoint_id)
}
pub fn peers_removed(&self, peer_ids: &[PeerId]) {
self.srcs()
.values()
.for_each(|p| p.remove_peer_ids(peer_ids));
self.sinks()
.values()
.filter_map(|p| p.peer_id().map(|id| (id, p)))
.filter(|(id, _)| peer_ids.contains(&id))
.for_each(|(_, p)| p.reset());
}
#[inline]
#[must_use]
pub fn id(&self) -> MemberId {
self.0.borrow().id.clone()
}
#[inline]
#[must_use]
pub fn credentials(&self) -> Credential {
self.0.borrow().credentials.clone()
}
#[inline]
#[must_use]
pub fn verify_credentials(
&self,
credentials: &client_proto::Credential,
) -> bool {
self.0.borrow().credentials.verify(&credentials)
}
#[inline]
#[must_use]
pub fn srcs(&self) -> HashMap<WebRtcPublishId, WebRtcPublishEndpoint> {
self.0.borrow().srcs.clone()
}
#[inline]
#[must_use]
pub fn sinks(&self) -> HashMap<WebRtcPlayId, WebRtcPlayEndpoint> {
self.0.borrow().sinks.clone()
}
#[inline]
#[must_use]
pub fn sinks_ids(&self) -> Vec<WebRtcPlayId> {
self.0.borrow().sinks.keys().cloned().collect()
}
#[inline]
#[must_use]
pub fn srcs_ids(&self) -> Vec<WebRtcPublishId> {
self.0.borrow().srcs.keys().cloned().collect()
}
#[must_use]
pub fn partners(&self) -> Vec<Member> {
let this = self.0.borrow();
this.srcs
.values()
.flat_map(|src| src.sinks().into_iter().map(|s| s.owner()))
.chain(this.sinks.values().map(|s| s.src().owner()))
.map(|member| (member.id(), member))
.collect::<HashMap<_, _>>()
.into_iter()
.map(|(_, member)| member)
.collect()
}
pub fn insert_sink(&self, endpoint: WebRtcPlayEndpoint) {
self.0.borrow_mut().sinks.insert(endpoint.id(), endpoint);
}
pub fn insert_src(&self, endpoint: WebRtcPublishEndpoint) {
self.0.borrow_mut().srcs.insert(endpoint.id(), endpoint);
}
#[inline]
#[must_use]
pub fn get_src_by_id(
&self,
id: &WebRtcPublishId,
) -> Option<WebRtcPublishEndpoint> {
self.0.borrow().srcs.get(id).cloned()
}
#[inline]
#[must_use]
pub fn get_sink_by_id(
&self,
id: &WebRtcPlayId,
) -> Option<WebRtcPlayEndpoint> {
self.0.borrow().sinks.get(id).cloned()
}
#[inline]
#[must_use]
pub fn remove_sink(&self, id: &WebRtcPlayId) -> Option<WebRtcPlayEndpoint> {
self.0.borrow_mut().sinks.remove(id)
}
#[inline]
#[must_use]
pub fn remove_src(
&self,
id: &WebRtcPublishId,
) -> Option<WebRtcPublishEndpoint> {
self.0.borrow_mut().srcs.remove(id)
}
#[inline]
#[must_use]
pub fn room_id(&self) -> RoomId {
self.0.borrow().room_id.clone()
}
pub fn get_endpoint_by_id(
&self,
id: String,
) -> Result<Endpoint, MemberError> {
let webrtc_publish_id = id.into();
if let Some(publish_endpoint) = self.get_src_by_id(&webrtc_publish_id) {
return Ok(Endpoint::WebRtcPublishEndpoint(publish_endpoint));
}
let webrtc_play_id = String::from(webrtc_publish_id).into();
if let Some(play_endpoint) = self.get_sink_by_id(&webrtc_play_id) {
return Ok(Endpoint::WebRtcPlayEndpoint(play_endpoint));
}
Err(MemberError::EndpointNotFound(
self.get_fid_to_endpoint(webrtc_play_id.into()),
))
}
#[inline]
#[must_use]
pub fn downgrade(&self) -> WeakMember {
WeakMember(Rc::downgrade(&self.0))
}
#[cfg(test)]
#[inline]
#[must_use]
pub fn ptr_eq(&self, another_member: &Self) -> bool {
Rc::ptr_eq(&self.0, &another_member.0)
}
#[inline]
#[must_use]
pub fn get_on_join(&self) -> Option<CallbackUrl> {
self.0.borrow().on_join.clone()
}
#[inline]
#[must_use]
pub fn get_on_leave(&self) -> Option<CallbackUrl> {
self.0.borrow().on_leave.clone()
}
#[inline]
#[must_use]
pub fn get_idle_timeout(&self) -> Duration {
self.0.borrow().idle_timeout
}
#[inline]
#[must_use]
pub fn get_reconnect_timeout(&self) -> Duration {
self.0.borrow().reconnect_timeout
}
#[inline]
#[must_use]
pub fn get_ping_interval(&self) -> Duration {
self.0.borrow().ping_interval
}
pub fn set_callback_urls(&self, spec: &MemberSpec) {
self.0.borrow_mut().on_leave = spec.on_leave().clone();
self.0.borrow_mut().on_join = spec.on_join().clone();
}
}
#[derive(Clone, Debug)]
pub struct WeakMember(Weak<RefCell<MemberInner>>);
impl WeakMember {
#[inline]
#[must_use]
pub fn upgrade(&self) -> Member {
Member(Weak::upgrade(&self.0).unwrap())
}
#[inline]
#[must_use]
pub fn safe_upgrade(&self) -> Option<Member> {
Weak::upgrade(&self.0).map(Member)
}
}
pub fn parse_members(
room_spec: &RoomSpec,
rpc_conf: conf::Rpc,
) -> Result<HashMap<MemberId, Member>, MembersLoadError> {
let members_spec = room_spec.members().map_err(|e| {
MembersLoadError::TryFromError(
e,
Fid::<ToRoom>::new(room_spec.id.clone()).into(),
)
})?;
let members: HashMap<MemberId, Member> = members_spec
.iter()
.map(|(id, member)| {
let new_member = Member::new(
id.clone(),
member.credentials().clone(),
room_spec.id.clone(),
member.idle_timeout().unwrap_or(rpc_conf.idle_timeout),
member
.reconnect_timeout()
.unwrap_or(rpc_conf.reconnect_timeout),
member.ping_interval().unwrap_or(rpc_conf.ping_interval),
);
(id.clone(), new_member)
})
.collect();
for member in members.values() {
member.load(room_spec, &members)?;
}
debug!(
"Created ParticipantService with participants: {:?}.",
members
.iter()
.map(|(id, p)| {
format!(
"{{ id: {}, sinks: {:?}, srcs: {:?} }};",
id,
p.sinks()
.into_iter()
.map(|(id, _)| id.to_string())
.collect::<Vec<String>>(),
p.srcs()
.into_iter()
.map(|(id, _)| id.to_string())
.collect::<Vec<String>>()
)
})
.collect::<Vec<String>>()
);
Ok(members)
}
impl From<Member> for proto::Member {
fn from(m: Member) -> Self {
let member_pipeline = m
.sinks()
.into_iter()
.map(|(id, play)| (id.to_string(), play.into()))
.chain(
m.srcs()
.into_iter()
.map(|(id, publish)| (id.to_string(), publish.into())),
)
.collect();
Self {
id: m.id().to_string(),
credentials: Some(m.credentials().into()),
on_leave: m
.get_on_leave()
.map(|c| c.to_string())
.unwrap_or_default(),
on_join: m.get_on_join().map(|c| c.to_string()).unwrap_or_default(),
reconnect_timeout: Some(m.get_reconnect_timeout().into()),
idle_timeout: Some(m.get_idle_timeout().into()),
ping_interval: Some(m.get_ping_interval().into()),
pipeline: member_pipeline,
}
}
}
impl From<Member> for proto::room::Element {
#[inline]
fn from(m: Member) -> Self {
Self {
el: Some(proto::room::element::El::Member(m.into())),
}
}
}
impl From<Member> for proto::Element {
#[inline]
fn from(m: Member) -> Self {
Self {
el: Some(proto::element::El::Member(m.into())),
}
}
}
#[cfg(test)]
mod tests {
use medea_client_api_proto::MemberId;
use crate::api::control::RootElement;
use super::*;
const TEST_SPEC: &str = r#"
kind: Room
id: test-call
spec:
pipeline:
caller:
kind: Member
credentials:
plain: test
spec:
pipeline:
publish:
kind: WebRtcPublishEndpoint
spec:
p2p: Always
some-member:
kind: Member
credentials:
plain: test
spec:
pipeline:
publish:
kind: WebRtcPublishEndpoint
spec:
p2p: Always
responder:
kind: Member
credentials:
plain: test
spec:
pipeline:
play:
kind: WebRtcPlayEndpoint
spec:
src: "local://test-call/caller/publish"
play2:
kind: WebRtcPlayEndpoint
spec:
src: "local://test-call/some-member/publish"
"#;
#[inline]
fn id<T: From<String>>(s: &str) -> T {
T::from(s.to_string())
}
fn get_test_store() -> HashMap<MemberId, Member> {
let room_element: RootElement =
serde_yaml::from_str(TEST_SPEC).unwrap();
let room_spec = RoomSpec::try_from(&room_element).unwrap();
parse_members(&room_spec, conf::Rpc::default()).unwrap()
}
#[test]
pub fn load_store() {
let store = get_test_store();
let caller = store.get(&id("caller")).unwrap();
let responder = store.get(&id("responder")).unwrap();
let caller_publish_endpoint =
caller.get_src_by_id(&id("publish")).unwrap();
let responder_play_endpoint =
responder.get_sink_by_id(&id("play")).unwrap();
let is_caller_has_responder_in_sinks = caller_publish_endpoint
.sinks()
.into_iter()
.filter(|p| p.ptr_eq(&responder_play_endpoint))
.count()
== 1;
assert!(is_caller_has_responder_in_sinks);
assert!(responder_play_endpoint
.src()
.ptr_eq(&caller_publish_endpoint));
let some_member = store.get(&id("some-member")).unwrap();
assert!(some_member.sinks().is_empty());
assert_eq!(some_member.srcs().len(), 1);
let responder_play2_endpoint =
responder.get_sink_by_id(&id("play2")).unwrap();
let some_member_publisher =
some_member.get_src_by_id(&id("publish")).unwrap();
assert_eq!(some_member_publisher.sinks().len(), 1);
let is_some_member_has_responder_in_sinks = some_member_publisher
.sinks()
.into_iter()
.filter(|p| p.ptr_eq(&responder_play2_endpoint))
.count()
== 1;
assert!(is_some_member_has_responder_in_sinks);
}
#[test]
fn publisher_delete_all_their_players() {
let store = get_test_store();
let caller = store.get(&id("caller")).unwrap();
let some_member = store.get(&id("some-member")).unwrap();
let responder = store.get(&id("responder")).unwrap();
drop(caller.remove_src(&id("publish")));
assert_eq!(responder.sinks().len(), 1);
drop(some_member.remove_src(&id("publish")));
assert_eq!(responder.sinks().len(), 0);
}
#[test]
fn player_delete_self_from_publisher_sink() {
let store = get_test_store();
let caller = store.get(&id("caller")).unwrap();
let some_member = store.get(&id("some-member")).unwrap();
let responder = store.get(&id("responder")).unwrap();
let caller_publisher = caller.get_src_by_id(&id("publish")).unwrap();
let some_member_publisher =
some_member.get_src_by_id(&id("publish")).unwrap();
drop(responder.remove_sink(&id("play")));
assert_eq!(caller_publisher.sinks().len(), 0);
assert_eq!(some_member_publisher.sinks().len(), 1);
drop(responder.remove_sink(&id("play2")));
assert_eq!(caller_publisher.sinks().len(), 0);
assert_eq!(some_member_publisher.sinks().len(), 0);
}
}