1use std::{cell::RefCell, collections::HashMap, rc::Rc, time::Duration};
4
5use futures::{channel::mpsc, future};
6use medea_client_api_proto::{self as proto, PeerId};
7use medea_macro::watchers;
8use medea_reactive::ObservableHashMap;
9use tracerr::Traced;
10
11use super::{PeerConnection, PeerEvent};
12use crate::{
13 connection::Connections,
14 media::{LocalTracksConstraints, MediaManager, RecvConstraints},
15 peer::{self, RtcPeerConnectionError},
16 platform,
17 utils::{
18 AsProtoState, SynchronizableState as _, TaskHandle, Updatable as _,
19 component,
20 },
21};
22
23pub type Component = component::Component<State, Repository>;
25
26impl Component {
27 #[must_use]
29 pub fn get(&self, id: PeerId) -> Option<Rc<PeerConnection>> {
30 self.peers.borrow().get(&id).map(component::Component::obj)
31 }
32
33 #[must_use]
35 pub fn get_all(&self) -> Vec<Rc<PeerConnection>> {
36 self.peers.borrow().values().map(component::Component::obj).collect()
37 }
38
39 pub fn connection_lost(&self) {
41 #[expect(clippy::iter_over_hash_type, reason = "order doesn't matter")]
42 for peer in self.peers.borrow().values() {
43 peer.state().connection_lost();
44 }
45 }
46
47 pub fn connection_recovered(&self) {
49 #[expect(clippy::iter_over_hash_type, reason = "order doesn't matter")]
50 for peer in self.peers.borrow().values() {
51 peer.state().connection_recovered();
52 }
53 }
54
55 pub fn apply(&self, new_state: proto::state::Room) {
57 let state = self.state();
58 let send_cons = &self.obj().send_constraints;
59
60 state.0.borrow_mut().remove_not_present(&new_state.peers);
61
62 #[expect(clippy::iter_over_hash_type, reason = "order doesn't matter")]
63 for (id, peer_state) in new_state.peers {
64 let peer = state.0.borrow().get(&id).cloned();
65 if let Some(p) = peer {
66 p.apply(peer_state, send_cons);
67 } else {
68 drop(state.0.borrow_mut().insert(
69 id,
70 Rc::new(peer::State::from_proto(peer_state, send_cons)),
71 ));
72 }
73 }
74 }
75}
76
77#[derive(Debug, Default)]
79pub struct State(RefCell<ObservableHashMap<PeerId, Rc<peer::State>>>);
80
81#[derive(Debug)]
83pub struct Repository {
84 media_manager: Rc<MediaManager>,
86
87 peers: Rc<RefCell<HashMap<PeerId, peer::Component>>>,
89
90 _stats_scrape_task: TaskHandle,
97
98 peer_event_sender: mpsc::UnboundedSender<PeerEvent>,
102
103 send_constraints: LocalTracksConstraints,
109
110 connections: Rc<Connections>,
114
115 recv_constraints: Rc<RecvConstraints>,
122}
123
124impl Repository {
125 #[must_use]
129 pub fn new(
130 media_manager: Rc<MediaManager>,
131 peer_event_sender: mpsc::UnboundedSender<PeerEvent>,
132 send_constraints: LocalTracksConstraints,
133 recv_constraints: Rc<RecvConstraints>,
134 connections: Rc<Connections>,
135 ) -> Self {
136 let peers = Rc::default();
137 Self {
138 media_manager,
139 _stats_scrape_task: Self::spawn_peers_stats_scrape_task(Rc::clone(
140 &peers,
141 )),
142 peers,
143 peer_event_sender,
144 send_constraints,
145 recv_constraints,
146 connections,
147 }
148 }
149
150 fn spawn_peers_stats_scrape_task(
156 peers: Rc<RefCell<HashMap<PeerId, peer::Component>>>,
157 ) -> TaskHandle {
158 let (fut, abort) = future::abortable(async move {
159 #[expect( clippy::infinite_loop,
161 reason = "cannot annotate `async` block with `-> !`"
162 )]
163 loop {
164 platform::delay_for(Duration::from_secs(1)).await;
165
166 let peers = peers
167 .borrow()
168 .values()
169 .map(component::Component::obj)
170 .collect::<Vec<_>>();
171 drop(
172 future::join_all(
173 peers.iter().map(|p| p.scrape_and_send_peer_stats()),
174 )
175 .await,
176 );
177 }
178 });
179
180 platform::spawn(async move {
181 _ = fut.await.ok();
182 });
183
184 abort.into()
185 }
186}
187
188impl State {
189 pub fn insert(&self, peer_id: PeerId, peer_state: peer::State) {
191 drop(self.0.borrow_mut().insert(peer_id, Rc::new(peer_state)));
192 }
193
194 #[must_use]
196 pub fn get(&self, peer_id: PeerId) -> Option<Rc<peer::State>> {
197 self.0.borrow().get(&peer_id).cloned()
198 }
199
200 pub fn remove(&self, peer_id: PeerId) {
202 drop(self.0.borrow_mut().remove(&peer_id));
203 }
204}
205
206impl AsProtoState for State {
207 type Output = proto::state::Room;
208
209 fn as_proto(&self) -> Self::Output {
210 Self::Output {
211 peers: self
212 .0
213 .borrow()
214 .iter()
215 .map(|(id, p)| (*id, p.as_proto()))
216 .collect(),
217 }
218 }
219}
220
221#[watchers]
222impl Component {
223 #[watch(self.0.borrow().on_insert())]
227 async fn peer_added(
228 peers: Rc<Repository>,
229 _: Rc<State>,
230 (peer_id, new_peer): (PeerId, Rc<peer::State>),
231 ) -> Result<(), Traced<RtcPeerConnectionError>> {
232 let peer = peer::Component::new(
233 PeerConnection::new(
234 &new_peer,
235 peers.peer_event_sender.clone(),
236 Rc::clone(&peers.media_manager),
237 peers.send_constraints.clone(),
238 Rc::clone(&peers.connections),
239 Rc::clone(&peers.recv_constraints),
240 )
241 .await
242 .map_err(tracerr::map_from_and_wrap!())?,
243 new_peer,
244 );
245
246 drop(peers.peers.borrow_mut().insert(peer_id, peer));
247
248 Ok(())
249 }
250
251 #[watch(self.0.borrow().on_remove())]
258 fn peer_removed(
259 peers: &Repository,
260 _: &State,
261 (peer_id, peer): (PeerId, Rc<peer::State>),
262 ) {
263 drop(peers.peers.borrow_mut().remove(&peer_id));
264 for t in peer.get_recv_tracks() {
265 peers.connections.remove_track(&t);
266 }
267 for t in peer.get_send_tracks() {
268 peers.connections.remove_track(&t);
269 }
270 }
271}