medea_jason/peer/
repo.rs

1//! Component responsible for the [`peer::Component`] creating and removing.
2
3use std::{cell::RefCell, collections::HashMap, rc::Rc, time::Duration};
4
5use futures::{channel::mpsc, future};
6use medea_client_api_proto::{self as proto, PeerId};
7use medea_macro::watchers;
8use medea_reactive::ObservableHashMap;
9use tracerr::Traced;
10
11use super::{PeerConnection, PeerEvent};
12use crate::{
13    connection::Connections,
14    media::{LocalTracksConstraints, MediaManager, RecvConstraints},
15    peer::{self, RtcPeerConnectionError},
16    platform,
17    utils::{
18        AsProtoState, SynchronizableState as _, TaskHandle, Updatable as _,
19        component,
20    },
21};
22
23/// Component responsible for the [`peer::Component`] creating and removing.
24pub type Component = component::Component<State, Repository>;
25
26impl Component {
27    /// Returns [`PeerConnection`] stored in the repository by its ID.
28    #[must_use]
29    pub fn get(&self, id: PeerId) -> Option<Rc<PeerConnection>> {
30        self.peers.borrow().get(&id).map(component::Component::obj)
31    }
32
33    /// Returns all [`PeerConnection`]s stored in the repository.
34    #[must_use]
35    pub fn get_all(&self) -> Vec<Rc<PeerConnection>> {
36        self.peers.borrow().values().map(component::Component::obj).collect()
37    }
38
39    /// Notifies all [`peer::Component`]s about a RPC connection loss.
40    pub fn connection_lost(&self) {
41        #[expect(clippy::iter_over_hash_type, reason = "order doesn't matter")]
42        for peer in self.peers.borrow().values() {
43            peer.state().connection_lost();
44        }
45    }
46
47    /// Notifies all [`peer::Component`]s about an RPC connection restore.
48    pub fn connection_recovered(&self) {
49        #[expect(clippy::iter_over_hash_type, reason = "order doesn't matter")]
50        for peer in self.peers.borrow().values() {
51            peer.state().connection_recovered();
52        }
53    }
54
55    /// Updates this [`State`] with the provided [`proto::state::Room`].
56    pub fn apply(&self, new_state: proto::state::Room) {
57        let state = self.state();
58        let send_cons = &self.obj().send_constraints;
59
60        state.0.borrow_mut().remove_not_present(&new_state.peers);
61
62        #[expect(clippy::iter_over_hash_type, reason = "order doesn't matter")]
63        for (id, peer_state) in new_state.peers {
64            let peer = state.0.borrow().get(&id).cloned();
65            if let Some(p) = peer {
66                p.apply(peer_state, send_cons);
67            } else {
68                drop(state.0.borrow_mut().insert(
69                    id,
70                    Rc::new(peer::State::from_proto(peer_state, send_cons)),
71                ));
72            }
73        }
74    }
75}
76
77/// State of the [`Component`].
78#[derive(Debug, Default)]
79pub struct State(RefCell<ObservableHashMap<PeerId, Rc<peer::State>>>);
80
81/// Context of the [`Component`].
82#[derive(Debug)]
83pub struct Repository {
84    /// [`MediaManager`] for injecting into new created [`PeerConnection`]s.
85    media_manager: Rc<MediaManager>,
86
87    /// Peer id to [`PeerConnection`],
88    peers: Rc<RefCell<HashMap<PeerId, peer::Component>>>,
89
90    /// [`TaskHandle`] for a task which will call
91    /// [`PeerConnection::send_peer_stats`] of all [`PeerConnection`]s
92    /// every second and send updated [`PeerMetrics::RtcStats`] to the server.
93    ///
94    /// [`PeerMetrics::RtcStats`]:
95    /// medea_client_api_proto::PeerMetrics::RtcStats
96    _stats_scrape_task: TaskHandle,
97
98    /// Channel for sending events produced by [`PeerConnection`] to [`Room`].
99    ///
100    /// [`Room`]: crate::room::Room
101    peer_event_sender: mpsc::UnboundedSender<PeerEvent>,
102
103    /// Constraints to local [`local::Track`]s that are being published by
104    /// [`PeerConnection`]s from this [`Repository`].
105    ///
106    /// [`Room`]: crate::room::Room
107    /// [`local::Track`]: crate::media::track::local::Track
108    send_constraints: LocalTracksConstraints,
109
110    /// Collection of [`Connection`]s with a remote `Member`s.
111    ///
112    /// [`Connection`]: crate::connection::Connection
113    connections: Rc<Connections>,
114
115    /// Constraints to the [`remote::Track`] received by [`PeerConnection`]s
116    /// from this [`Repository`].
117    ///
118    /// Used to disable or enable media receiving.
119    ///
120    /// [`remote::Track`]: crate::media::track::remote::Track
121    recv_constraints: Rc<RecvConstraints>,
122}
123
124impl Repository {
125    /// Returns new empty [`platform::RtcStats`].
126    ///
127    /// Spawns a task for scraping [`platform::RtcStats`].
128    #[must_use]
129    pub fn new(
130        media_manager: Rc<MediaManager>,
131        peer_event_sender: mpsc::UnboundedSender<PeerEvent>,
132        send_constraints: LocalTracksConstraints,
133        recv_constraints: Rc<RecvConstraints>,
134        connections: Rc<Connections>,
135    ) -> Self {
136        let peers = Rc::default();
137        Self {
138            media_manager,
139            _stats_scrape_task: Self::spawn_peers_stats_scrape_task(Rc::clone(
140                &peers,
141            )),
142            peers,
143            peer_event_sender,
144            send_constraints,
145            recv_constraints,
146            connections,
147        }
148    }
149
150    /// Spawns a task which will call [`PeerConnection::send_peer_stats()`] of
151    /// all [`PeerConnection`]s every second and send updated
152    /// [`platform::RtcStats`] to a server.
153    ///
154    /// Returns [`TaskHandle`] which will stop this task on its [`Drop`].
155    fn spawn_peers_stats_scrape_task(
156        peers: Rc<RefCell<HashMap<PeerId, peer::Component>>>,
157    ) -> TaskHandle {
158        let (fut, abort) = future::abortable(async move {
159            #[expect( // intentional
160                clippy::infinite_loop,
161                reason = "cannot annotate `async` block with `-> !`"
162            )]
163            loop {
164                platform::delay_for(Duration::from_secs(1)).await;
165
166                let peers = peers
167                    .borrow()
168                    .values()
169                    .map(component::Component::obj)
170                    .collect::<Vec<_>>();
171                drop(
172                    future::join_all(
173                        peers.iter().map(|p| p.scrape_and_send_peer_stats()),
174                    )
175                    .await,
176                );
177            }
178        });
179
180        platform::spawn(async move {
181            _ = fut.await.ok();
182        });
183
184        abort.into()
185    }
186}
187
188impl State {
189    /// Inserts the provided [`peer::State`].
190    pub fn insert(&self, peer_id: PeerId, peer_state: peer::State) {
191        drop(self.0.borrow_mut().insert(peer_id, Rc::new(peer_state)));
192    }
193
194    /// Lookups [`peer::State`] by the provided [`PeerId`].
195    #[must_use]
196    pub fn get(&self, peer_id: PeerId) -> Option<Rc<peer::State>> {
197        self.0.borrow().get(&peer_id).cloned()
198    }
199
200    /// Removes [`peer::State`] with the provided [`PeerId`].
201    pub fn remove(&self, peer_id: PeerId) {
202        drop(self.0.borrow_mut().remove(&peer_id));
203    }
204}
205
206impl AsProtoState for State {
207    type Output = proto::state::Room;
208
209    fn as_proto(&self) -> Self::Output {
210        Self::Output {
211            peers: self
212                .0
213                .borrow()
214                .iter()
215                .map(|(id, p)| (*id, p.as_proto()))
216                .collect(),
217        }
218    }
219}
220
221#[watchers]
222impl Component {
223    /// Watches for new [`peer::State`] insertions.
224    ///
225    /// Creates new [`peer::Component`] based on the inserted [`peer::State`].
226    #[watch(self.0.borrow().on_insert())]
227    async fn peer_added(
228        peers: Rc<Repository>,
229        _: Rc<State>,
230        (peer_id, new_peer): (PeerId, Rc<peer::State>),
231    ) -> Result<(), Traced<RtcPeerConnectionError>> {
232        let peer = peer::Component::new(
233            PeerConnection::new(
234                &new_peer,
235                peers.peer_event_sender.clone(),
236                Rc::clone(&peers.media_manager),
237                peers.send_constraints.clone(),
238                Rc::clone(&peers.connections),
239                Rc::clone(&peers.recv_constraints),
240            )
241            .await
242            .map_err(tracerr::map_from_and_wrap!())?,
243            new_peer,
244        );
245
246        drop(peers.peers.borrow_mut().insert(peer_id, peer));
247
248        Ok(())
249    }
250
251    /// Watches for [`peer::State`] removal.
252    ///
253    /// Removes [`peer::Component`] and closes [`Connection`] by calling
254    /// [`Connections::close_connection()`].
255    ///
256    /// [`Connection`]: crate::connection::Connection
257    #[watch(self.0.borrow().on_remove())]
258    fn peer_removed(
259        peers: &Repository,
260        _: &State,
261        (peer_id, peer): (PeerId, Rc<peer::State>),
262    ) {
263        drop(peers.peers.borrow_mut().remove(&peer_id));
264        for t in peer.get_recv_tracks() {
265            peers.connections.remove_track(&t);
266        }
267        for t in peer.get_send_tracks() {
268            peers.connections.remove_track(&t);
269        }
270    }
271}