use std::{cell::RefCell, collections::HashMap, rc::Rc, time::Duration};
use futures::{channel::mpsc, future};
use medea_client_api_proto::{self as proto, PeerId};
use medea_macro::watchers;
use medea_reactive::ObservableHashMap;
use tracerr::Traced;
use wasm_bindgen_futures::spawn_local;
use crate::{
api::{Connections, RoomError},
media::{LocalTracksConstraints, MediaManager, RecvConstraints},
peer,
utils::{
component, delay_for, AsProtoState, SynchronizableState, TaskHandle,
Updatable as _,
},
};
use super::{PeerConnection, PeerEvent};
pub type Component = component::Component<State, Repository>;
impl Component {
#[inline]
#[must_use]
pub fn get(&self, id: PeerId) -> Option<Rc<PeerConnection>> {
self.peers.borrow().get(&id).map(component::Component::obj)
}
#[inline]
#[must_use]
pub fn get_all(&self) -> Vec<Rc<PeerConnection>> {
self.peers
.borrow()
.values()
.map(component::Component::obj)
.collect()
}
#[inline]
pub fn connection_lost(&self) {
for peer in self.peers.borrow().values() {
peer.state().connection_lost();
}
}
#[inline]
pub fn connection_recovered(&self) {
for peer in self.peers.borrow().values() {
peer.state().connection_recovered();
}
}
pub fn apply(&self, new_state: proto::state::Room) {
let state = self.state();
let send_cons = &self.obj().send_constraints;
state.0.borrow_mut().remove_not_present(&new_state.peers);
for (id, peer_state) in new_state.peers {
let peer = state.0.borrow().get(&id).cloned();
if let Some(peer) = peer {
peer.apply(peer_state, send_cons);
} else {
state.0.borrow_mut().insert(
id,
Rc::new(peer::State::from_proto(peer_state, send_cons)),
);
}
}
}
}
#[derive(Default)]
pub struct State(RefCell<ObservableHashMap<PeerId, Rc<peer::State>>>);
pub struct Repository {
media_manager: Rc<MediaManager>,
peers: Rc<RefCell<HashMap<PeerId, peer::Component>>>,
_stats_scrape_task: TaskHandle,
peer_event_sender: mpsc::UnboundedSender<PeerEvent>,
send_constraints: LocalTracksConstraints,
connections: Rc<Connections>,
recv_constraints: Rc<RecvConstraints>,
}
impl Repository {
#[must_use]
pub fn new(
media_manager: Rc<MediaManager>,
peer_event_sender: mpsc::UnboundedSender<PeerEvent>,
send_constraints: LocalTracksConstraints,
recv_constraints: Rc<RecvConstraints>,
connections: Rc<Connections>,
) -> Self {
let peers = Rc::default();
Self {
media_manager,
_stats_scrape_task: Self::spawn_peers_stats_scrape_task(Rc::clone(
&peers,
)),
peers,
peer_event_sender,
send_constraints,
recv_constraints,
connections,
}
}
fn spawn_peers_stats_scrape_task(
peers: Rc<RefCell<HashMap<PeerId, peer::Component>>>,
) -> TaskHandle {
let (fut, abort) = future::abortable(async move {
loop {
delay_for(Duration::from_secs(1).into()).await;
let peers = peers
.borrow()
.values()
.map(component::Component::obj)
.collect::<Vec<_>>();
future::join_all(
peers.iter().map(|p| p.scrape_and_send_peer_stats()),
)
.await;
}
});
spawn_local(async move {
fut.await.ok();
});
abort.into()
}
}
impl State {
#[inline]
pub fn insert(&self, peer_id: PeerId, peer_state: peer::State) {
self.0.borrow_mut().insert(peer_id, Rc::new(peer_state));
}
#[inline]
#[must_use]
pub fn get(&self, peer_id: PeerId) -> Option<Rc<peer::State>> {
self.0.borrow().get(&peer_id).cloned()
}
#[inline]
pub fn remove(&self, peer_id: PeerId) {
self.0.borrow_mut().remove(&peer_id);
}
}
impl AsProtoState for State {
type Output = proto::state::Room;
#[inline]
fn as_proto(&self) -> Self::Output {
Self::Output {
peers: self
.0
.borrow()
.iter()
.map(|(id, p)| (*id, p.as_proto()))
.collect(),
}
}
}
#[watchers]
impl Component {
#[watch(self.0.borrow().on_insert())]
async fn peer_added(
peers: Rc<Repository>,
_: Rc<State>,
(peer_id, new_peer): (PeerId, Rc<peer::State>),
) -> Result<(), Traced<RoomError>> {
let peer = peer::Component::new(
PeerConnection::new(
&new_peer,
peers.peer_event_sender.clone(),
Rc::clone(&peers.media_manager),
peers.send_constraints.clone(),
Rc::clone(&peers.connections),
Rc::clone(&peers.recv_constraints),
)
.map_err(tracerr::map_from_and_wrap!())?,
new_peer,
);
peers.peers.borrow_mut().insert(peer_id, peer);
Ok(())
}
#[inline]
#[watch(self.0.borrow().on_remove())]
async fn peer_removed(
peers: Rc<Repository>,
_: Rc<State>,
(peer_id, _): (PeerId, Rc<peer::State>),
) -> Result<(), Traced<RoomError>> {
peers.peers.borrow_mut().remove(&peer_id);
peers.connections.close_connection(peer_id);
Ok(())
}
}