use std::{cell::RefCell, collections::HashMap, rc::Rc, time::Duration};
use futures::{channel::mpsc, future};
use medea_client_api_proto::{self as proto, PeerId};
use medea_macro::watchers;
use medea_reactive::ObservableHashMap;
use tracerr::Traced;
use crate::{
connection::Connections,
media::{LocalTracksConstraints, MediaManager, RecvConstraints},
peer::{self, RtcPeerConnectionError},
platform,
utils::{
component, AsProtoState, SynchronizableState as _, TaskHandle,
Updatable as _,
},
};
use super::{PeerConnection, PeerEvent};
pub type Component = component::Component<State, Repository>;
impl Component {
#[must_use]
pub fn get(&self, id: PeerId) -> Option<Rc<PeerConnection>> {
self.peers.borrow().get(&id).map(component::Component::obj)
}
#[must_use]
pub fn get_all(&self) -> Vec<Rc<PeerConnection>> {
self.peers
.borrow()
.values()
.map(component::Component::obj)
.collect()
}
pub fn connection_lost(&self) {
#[expect(clippy::iter_over_hash_type, reason = "order doesn't matter")]
for peer in self.peers.borrow().values() {
peer.state().connection_lost();
}
}
pub fn connection_recovered(&self) {
#[expect(clippy::iter_over_hash_type, reason = "order doesn't matter")]
for peer in self.peers.borrow().values() {
peer.state().connection_recovered();
}
}
pub fn apply(&self, new_state: proto::state::Room) {
let state = self.state();
let send_cons = &self.obj().send_constraints;
state.0.borrow_mut().remove_not_present(&new_state.peers);
#[expect(clippy::iter_over_hash_type, reason = "order doesn't matter")]
for (id, peer_state) in new_state.peers {
let peer = state.0.borrow().get(&id).cloned();
if let Some(p) = peer {
p.apply(peer_state, send_cons);
} else {
drop(state.0.borrow_mut().insert(
id,
Rc::new(peer::State::from_proto(peer_state, send_cons)),
));
}
}
}
}
#[derive(Debug, Default)]
pub struct State(RefCell<ObservableHashMap<PeerId, Rc<peer::State>>>);
#[derive(Debug)]
pub struct Repository {
media_manager: Rc<MediaManager>,
peers: Rc<RefCell<HashMap<PeerId, peer::Component>>>,
_stats_scrape_task: TaskHandle,
peer_event_sender: mpsc::UnboundedSender<PeerEvent>,
send_constraints: LocalTracksConstraints,
connections: Rc<Connections>,
recv_constraints: Rc<RecvConstraints>,
}
impl Repository {
#[must_use]
pub fn new(
media_manager: Rc<MediaManager>,
peer_event_sender: mpsc::UnboundedSender<PeerEvent>,
send_constraints: LocalTracksConstraints,
recv_constraints: Rc<RecvConstraints>,
connections: Rc<Connections>,
) -> Self {
let peers = Rc::default();
Self {
media_manager,
_stats_scrape_task: Self::spawn_peers_stats_scrape_task(Rc::clone(
&peers,
)),
peers,
peer_event_sender,
send_constraints,
recv_constraints,
connections,
}
}
fn spawn_peers_stats_scrape_task(
peers: Rc<RefCell<HashMap<PeerId, peer::Component>>>,
) -> TaskHandle {
let (fut, abort) = future::abortable(async move {
#[expect( // intentional
clippy::infinite_loop,
reason = "cannot annotate `async` block with `-> !`"
)]
loop {
platform::delay_for(Duration::from_secs(1)).await;
let peers = peers
.borrow()
.values()
.map(component::Component::obj)
.collect::<Vec<_>>();
drop(
future::join_all(
peers.iter().map(|p| p.scrape_and_send_peer_stats()),
)
.await,
);
}
});
platform::spawn(async move {
_ = fut.await.ok();
});
abort.into()
}
}
impl State {
pub fn insert(&self, peer_id: PeerId, peer_state: peer::State) {
drop(self.0.borrow_mut().insert(peer_id, Rc::new(peer_state)));
}
#[must_use]
pub fn get(&self, peer_id: PeerId) -> Option<Rc<peer::State>> {
self.0.borrow().get(&peer_id).cloned()
}
pub fn remove(&self, peer_id: PeerId) {
drop(self.0.borrow_mut().remove(&peer_id));
}
}
impl AsProtoState for State {
type Output = proto::state::Room;
fn as_proto(&self) -> Self::Output {
Self::Output {
peers: self
.0
.borrow()
.iter()
.map(|(id, p)| (*id, p.as_proto()))
.collect(),
}
}
}
#[watchers]
impl Component {
#[watch(self.0.borrow().on_insert())]
async fn peer_added(
peers: Rc<Repository>,
_: Rc<State>,
(peer_id, new_peer): (PeerId, Rc<peer::State>),
) -> Result<(), Traced<RtcPeerConnectionError>> {
let peer = peer::Component::new(
PeerConnection::new(
&new_peer,
peers.peer_event_sender.clone(),
Rc::clone(&peers.media_manager),
peers.send_constraints.clone(),
Rc::clone(&peers.connections),
Rc::clone(&peers.recv_constraints),
)
.await
.map_err(tracerr::map_from_and_wrap!())?,
new_peer,
);
drop(peers.peers.borrow_mut().insert(peer_id, peer));
Ok(())
}
#[watch(self.0.borrow().on_remove())]
fn peer_removed(
peers: &Repository,
_: &State,
(peer_id, peer): (PeerId, Rc<peer::State>),
) {
drop(peers.peers.borrow_mut().remove(&peer_id));
for t in peer.get_recv_tracks() {
peers.connections.remove_track(&t);
}
for t in peer.get_send_tracks() {
peers.connections.remove_track(&t);
}
}
}