use std::{
cell::RefCell,
collections::{HashMap, HashSet},
rc::{Rc, Weak},
};
use medea_client_api_proto::{PeerId, TrackId};
use medea_control_api_proto::grpc::api as proto;
use crate::{
api::control::endpoints::webrtc_publish_endpoint::{
AudioSettings, P2pMode, VideoSettings, WebRtcPublishId as Id,
},
signalling::elements::{
endpoints::webrtc::play_endpoint::WeakWebRtcPlayEndpoint,
member::WeakMember, Member,
},
};
use super::play_endpoint::WebRtcPlayEndpoint;
#[derive(Clone, Debug)]
struct WebRtcPublishEndpointInner {
id: Id,
tracks_ids: HashMap<PeerId, Vec<TrackId>>,
p2p: P2pMode,
is_force_relayed: bool,
sinks: Vec<WeakWebRtcPlayEndpoint>,
owner: WeakMember,
audio_settings: AudioSettings,
video_settings: VideoSettings,
peer_ids: HashSet<PeerId>,
}
impl Drop for WebRtcPublishEndpointInner {
fn drop(&mut self) {
for receiver in self
.sinks
.iter()
.filter_map(WeakWebRtcPlayEndpoint::safe_upgrade)
{
if let Some(receiver_owner) = receiver.weak_owner().safe_upgrade() {
drop(receiver_owner.remove_sink(&receiver.id()))
}
}
}
}
impl WebRtcPublishEndpointInner {
fn add_sinks(&mut self, sink: WeakWebRtcPlayEndpoint) {
self.sinks.push(sink);
}
fn sinks(&self) -> Vec<WebRtcPlayEndpoint> {
self.sinks
.iter()
.map(WeakWebRtcPlayEndpoint::upgrade)
.collect()
}
fn owner(&self) -> Member {
self.owner.upgrade()
}
fn add_peer_id(&mut self, peer_id: PeerId) {
self.peer_ids.insert(peer_id);
}
fn peer_ids(&self) -> HashSet<PeerId> {
self.peer_ids.clone()
}
fn reset(&mut self) {
self.peer_ids = HashSet::new()
}
#[allow(clippy::trivially_copy_pass_by_ref)]
fn remove_peer_id(&mut self, peer_id: &PeerId) {
self.peer_ids.remove(peer_id);
}
fn remove_peer_ids(&mut self, peer_ids: &[PeerId]) {
for peer_id in peer_ids {
self.remove_peer_id(peer_id)
}
}
}
#[derive(Debug, Clone)]
pub struct WebRtcPublishEndpoint(Rc<RefCell<WebRtcPublishEndpointInner>>);
impl WebRtcPublishEndpoint {
#[inline]
#[must_use]
pub fn new(
id: Id,
p2p: P2pMode,
owner: WeakMember,
is_force_relayed: bool,
audio_settings: AudioSettings,
video_settings: VideoSettings,
) -> Self {
Self(Rc::new(RefCell::new(WebRtcPublishEndpointInner {
id,
p2p,
is_force_relayed,
sinks: Vec::new(),
owner,
audio_settings,
video_settings,
peer_ids: HashSet::new(),
tracks_ids: HashMap::new(),
})))
}
#[inline]
pub fn add_sink(&self, sink: WeakWebRtcPlayEndpoint) {
self.0.borrow_mut().add_sinks(sink)
}
#[inline]
#[must_use]
pub fn sinks(&self) -> Vec<WebRtcPlayEndpoint> {
self.0.borrow().sinks()
}
#[inline]
#[must_use]
pub fn owner(&self) -> Member {
self.0.borrow().owner()
}
#[inline]
pub fn add_peer_id(&self, peer_id: PeerId) {
self.0.borrow_mut().add_peer_id(peer_id)
}
#[inline]
#[must_use]
pub fn peer_ids(&self) -> HashSet<PeerId> {
self.0.borrow().peer_ids()
}
#[inline]
pub fn reset(&self) {
self.0.borrow_mut().reset()
}
#[inline]
pub fn remove_peer_ids(&self, peer_ids: &[PeerId]) {
self.0.borrow_mut().remove_peer_ids(peer_ids)
}
#[inline]
#[must_use]
pub fn id(&self) -> Id {
self.0.borrow().id.clone()
}
#[inline]
pub fn remove_empty_weaks_from_sinks(&self) {
self.0
.borrow_mut()
.sinks
.retain(|e| e.safe_upgrade().is_some());
}
#[inline]
#[must_use]
pub fn p2p(&self) -> P2pMode {
self.0.borrow().p2p
}
#[inline]
#[must_use]
pub fn is_force_relayed(&self) -> bool {
self.0.borrow().is_force_relayed
}
#[inline]
pub fn add_track_id(&self, peer_id: PeerId, track_id: TrackId) {
let mut inner = self.0.borrow_mut();
inner.tracks_ids.entry(peer_id).or_default().push(track_id);
}
#[inline]
#[must_use]
pub fn get_tracks_ids_by_peer_id(&self, peer_id: PeerId) -> Vec<TrackId> {
let inner = self.0.borrow();
inner.tracks_ids.get(&peer_id).cloned().unwrap_or_default()
}
#[allow(clippy::unused_self)]
#[inline]
#[must_use]
pub fn has_traffic_callback(&self) -> bool {
true
}
#[inline]
#[must_use]
pub fn audio_settings(&self) -> AudioSettings {
self.0.borrow().audio_settings
}
#[inline]
#[must_use]
pub fn video_settings(&self) -> VideoSettings {
self.0.borrow().video_settings
}
#[inline]
#[must_use]
pub fn downgrade(&self) -> WeakWebRtcPublishEndpoint {
WeakWebRtcPublishEndpoint(Rc::downgrade(&self.0))
}
#[cfg(test)]
#[inline]
pub fn ptr_eq(&self, another_publish: &Self) -> bool {
Rc::ptr_eq(&self.0, &another_publish.0)
}
}
#[derive(Clone, Debug)]
pub struct WeakWebRtcPublishEndpoint(Weak<RefCell<WebRtcPublishEndpointInner>>);
impl WeakWebRtcPublishEndpoint {
#[inline]
#[must_use]
pub fn upgrade(&self) -> WebRtcPublishEndpoint {
WebRtcPublishEndpoint(self.0.upgrade().unwrap())
}
#[inline]
#[must_use]
pub fn safe_upgrade(&self) -> Option<WebRtcPublishEndpoint> {
self.0.upgrade().map(WebRtcPublishEndpoint)
}
}
impl From<WebRtcPublishEndpoint> for proto::WebRtcPublishEndpoint {
fn from(endpoint: WebRtcPublishEndpoint) -> Self {
let p2p: proto::web_rtc_publish_endpoint::P2p = endpoint.p2p().into();
Self {
p2p: p2p as i32,
id: endpoint.id().to_string(),
force_relay: endpoint.is_force_relayed(),
audio_settings: Some(endpoint.audio_settings().into()),
video_settings: Some(endpoint.video_settings().into()),
on_stop: String::new(),
on_start: String::new(),
}
}
}
impl From<WebRtcPublishEndpoint> for proto::member::Element {
#[inline]
fn from(endpoint: WebRtcPublishEndpoint) -> Self {
Self {
el: Some(proto::member::element::El::WebrtcPub(endpoint.into())),
}
}
}
impl From<WebRtcPublishEndpoint> for proto::Element {
#[inline]
fn from(endpoint: WebRtcPublishEndpoint) -> Self {
Self {
el: Some(proto::element::El::WebrtcPub(endpoint.into())),
}
}
}