atm0s_media_server_cluster_local/
lib.rs

1use std::sync::Arc;
2
3use async_std::channel::{Receiver, Sender};
4use cluster::{
5    generate_cluster_track_uuid, Cluster, ClusterEndpoint, ClusterEndpointError, ClusterEndpointIncomingEvent, ClusterEndpointOutgoingEvent, ClusterLocalTrackOutgoingEvent,
6    ClusterRemoteTrackIncomingEvent, ClusterRemoteTrackOutgoingEvent,
7};
8use event_hub::LocalEventHub;
9use media_hub::LocalMediaHub;
10use media_utils::{hash_str, ResourceTracking};
11use parking_lot::RwLock;
12
13mod event_hub;
14mod media_hub;
15
16pub struct PeerLocal {
17    peer_id_hash: u64,
18    room_id: String,
19    peer_id: String,
20    event_hub: Arc<RwLock<LocalEventHub>>,
21    media_hub: Arc<RwLock<LocalMediaHub>>,
22    tx: Sender<ClusterEndpointIncomingEvent>,
23    rx: Receiver<ClusterEndpointIncomingEvent>,
24    tracking: ResourceTracking,
25}
26
27impl PeerLocal {
28    pub fn new(event_hub: Arc<RwLock<LocalEventHub>>, media_hub: Arc<RwLock<LocalMediaHub>>, room_id: &str, peer_id: &str) -> Self {
29        let (tx, rx) = async_std::channel::bounded(1000);
30        log::debug!("[PeerLocal {}/{}] created", room_id, peer_id);
31        Self {
32            peer_id_hash: hash_str(&format!("{}-{}", room_id, peer_id)) << 16,
33            event_hub,
34            media_hub,
35            room_id: room_id.into(),
36            peer_id: peer_id.into(),
37            tx,
38            rx,
39            tracking: Default::default(),
40        }
41    }
42}
43
44#[async_trait::async_trait]
45impl ClusterEndpoint for PeerLocal {
46    fn on_event(&mut self, event: ClusterEndpointOutgoingEvent) -> Result<(), ClusterEndpointError> {
47        match event {
48            ClusterEndpointOutgoingEvent::SubscribeRoom => {
49                self.tracking.add("sub-room");
50                self.event_hub.write().subscribe_room(&self.room_id, self.peer_id_hash as u32, self.tx.clone());
51            }
52            ClusterEndpointOutgoingEvent::UnsubscribeRoom => {
53                self.tracking.remove("sub-room");
54                self.event_hub.write().unsubscribe_room(&self.room_id, self.peer_id_hash as u32);
55            }
56            ClusterEndpointOutgoingEvent::SubscribePeer(peer_id) => {
57                self.tracking.add2("sub-peer", &peer_id);
58                self.event_hub.write().subscribe_peer(&self.room_id, &peer_id, self.peer_id_hash as u32, self.tx.clone());
59            }
60            ClusterEndpointOutgoingEvent::UnsubscribePeer(peer_id) => {
61                self.tracking.remove2("sub-peer", &peer_id);
62                self.event_hub.write().unsubscribe_peer(&self.room_id, &peer_id, self.peer_id_hash as u32);
63            }
64            ClusterEndpointOutgoingEvent::LocalTrackEvent(track_id, event) => match event {
65                ClusterLocalTrackOutgoingEvent::Subscribe(peer_id, track_name) => {
66                    self.tracking.add3("sub-track", &peer_id, &track_name);
67                    let consumer_id = self.peer_id_hash | track_id as u64;
68                    let track_uuid = generate_cluster_track_uuid(&self.room_id, &peer_id, &track_name);
69                    self.media_hub.write().subscribe(track_uuid, consumer_id, self.tx.clone());
70                }
71                ClusterLocalTrackOutgoingEvent::Unsubscribe(peer_id, track_name) => {
72                    self.tracking.remove3("sub-track", &peer_id, &track_name);
73                    let consumer_id = self.peer_id_hash | track_id as u64;
74                    let track_uuid = generate_cluster_track_uuid(&self.room_id, &peer_id, &track_name);
75                    self.media_hub.write().unsubscribe(track_uuid, consumer_id);
76                }
77                ClusterLocalTrackOutgoingEvent::RequestKeyFrame(kind) => {
78                    let consumer_id = self.peer_id_hash | track_id as u64;
79                    self.media_hub.read().forward(consumer_id, ClusterRemoteTrackIncomingEvent::RequestKeyFrame(kind));
80                }
81                ClusterLocalTrackOutgoingEvent::LimitBitrate(bitrate) => {
82                    //TODO aggerate all consumers bitrate
83                }
84            },
85            ClusterEndpointOutgoingEvent::RemoteTrackEvent(track_id, cluster_track_uuid, event) => match event {
86                ClusterRemoteTrackOutgoingEvent::TrackAdded(track_name, track_meta) => {
87                    self.tracking.add2("track", &track_name);
88                    let track_uuid = generate_cluster_track_uuid(&self.room_id, &self.peer_id, &track_name);
89                    self.event_hub.write().add_track(&self.room_id, &self.peer_id, &track_name, track_meta);
90                    self.media_hub.write().add_track(track_uuid, track_id, self.tx.clone());
91                }
92                ClusterRemoteTrackOutgoingEvent::TrackMedia(pkt) => {
93                    self.media_hub.write().relay(cluster_track_uuid, pkt);
94                }
95                ClusterRemoteTrackOutgoingEvent::TrackStats(stats) => {
96                    self.media_hub.write().relay_stats(cluster_track_uuid, stats);
97                }
98                ClusterRemoteTrackOutgoingEvent::TrackRemoved(track_name) => {
99                    self.tracking.remove2("track", &track_name);
100                    let track_uuid = generate_cluster_track_uuid(&self.room_id, &self.peer_id, &track_name);
101                    self.event_hub.write().remove_track(&self.room_id, &self.peer_id, &track_name);
102                    self.media_hub.write().remove_track(track_uuid);
103                }
104            },
105        }
106        Ok(())
107    }
108
109    async fn recv(&mut self) -> Result<ClusterEndpointIncomingEvent, ClusterEndpointError> {
110        self.rx.recv().await.map_err(|_| ClusterEndpointError::InternalError)
111    }
112}
113
114impl Drop for PeerLocal {
115    fn drop(&mut self) {
116        log::debug!("[PeerLocal {}/{}] drop", self.room_id, self.peer_id);
117        if !self.tracking.is_empty() {
118            log::error!("PeerLocal {}-{} tracking not empty: {}", self.room_id, self.peer_id, self.tracking.dump());
119        }
120        assert!(self.tracking.is_empty());
121    }
122}
123
124pub struct ServerLocal {
125    event_hub: Arc<RwLock<LocalEventHub>>,
126    media_hub: Arc<RwLock<LocalMediaHub>>,
127}
128
129impl ServerLocal {
130    pub fn new() -> Self {
131        Self {
132            event_hub: Arc::new(RwLock::new(LocalEventHub::default())),
133            media_hub: Arc::new(RwLock::new(LocalMediaHub::default())),
134        }
135    }
136}
137
138impl Cluster<PeerLocal> for ServerLocal {
139    fn build(&mut self, room_id: &str, peer_id: &str) -> PeerLocal {
140        PeerLocal::new(self.event_hub.clone(), self.media_hub.clone(), room_id, peer_id)
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use std::time::Duration;
147
148    use async_std::prelude::FutureExt;
149    use cluster::{generate_cluster_track_uuid, Cluster, ClusterEndpoint, ClusterEndpointIncomingEvent, ClusterRemoteTrackOutgoingEvent, ClusterTrackMeta};
150
151    #[async_std::test]
152    async fn subscribe_room() {
153        let mut server_local = super::ServerLocal::new();
154        let mut peer1 = server_local.build("room", "peer1");
155        let mut peer2 = server_local.build("room", "peer2");
156        let cluster_track_uuid = generate_cluster_track_uuid("room", "peer2", "audio_main");
157
158        peer1.on_event(cluster::ClusterEndpointOutgoingEvent::SubscribeRoom).unwrap();
159        let meta = ClusterTrackMeta::default_audio();
160        peer2
161            .on_event(cluster::ClusterEndpointOutgoingEvent::RemoteTrackEvent(
162                1,
163                cluster_track_uuid,
164                ClusterRemoteTrackOutgoingEvent::TrackAdded("audio_main".to_string(), meta.clone()),
165            ))
166            .unwrap();
167
168        assert_eq!(
169            peer1.recv().timeout(Duration::from_secs(1)).await,
170            Ok(Ok(ClusterEndpointIncomingEvent::PeerTrackAdded("peer2".to_string(), "audio_main".to_string(), meta.clone())))
171        );
172
173        peer2
174            .on_event(cluster::ClusterEndpointOutgoingEvent::RemoteTrackEvent(
175                1,
176                cluster_track_uuid,
177                ClusterRemoteTrackOutgoingEvent::TrackRemoved("audio_main".to_string()),
178            ))
179            .unwrap();
180
181        assert_eq!(
182            peer1.recv().timeout(Duration::from_secs(1)).await,
183            Ok(Ok(ClusterEndpointIncomingEvent::PeerTrackRemoved("peer2".to_string(), "audio_main".to_string())))
184        );
185
186        peer1.on_event(cluster::ClusterEndpointOutgoingEvent::UnsubscribeRoom).unwrap();
187    }
188}