atm0s_media_server_cluster_local/
lib.rs1use std::sync::Arc;
2
3use async_std::channel::{Receiver, Sender};
4use cluster::{
5 generate_cluster_track_uuid, Cluster, ClusterEndpoint, ClusterEndpointError, ClusterEndpointIncomingEvent, ClusterEndpointOutgoingEvent, ClusterLocalTrackOutgoingEvent,
6 ClusterRemoteTrackIncomingEvent, ClusterRemoteTrackOutgoingEvent,
7};
8use event_hub::LocalEventHub;
9use media_hub::LocalMediaHub;
10use media_utils::{hash_str, ResourceTracking};
11use parking_lot::RwLock;
12
13mod event_hub;
14mod media_hub;
15
16pub struct PeerLocal {
17 peer_id_hash: u64,
18 room_id: String,
19 peer_id: String,
20 event_hub: Arc<RwLock<LocalEventHub>>,
21 media_hub: Arc<RwLock<LocalMediaHub>>,
22 tx: Sender<ClusterEndpointIncomingEvent>,
23 rx: Receiver<ClusterEndpointIncomingEvent>,
24 tracking: ResourceTracking,
25}
26
27impl PeerLocal {
28 pub fn new(event_hub: Arc<RwLock<LocalEventHub>>, media_hub: Arc<RwLock<LocalMediaHub>>, room_id: &str, peer_id: &str) -> Self {
29 let (tx, rx) = async_std::channel::bounded(1000);
30 log::debug!("[PeerLocal {}/{}] created", room_id, peer_id);
31 Self {
32 peer_id_hash: hash_str(&format!("{}-{}", room_id, peer_id)) << 16,
33 event_hub,
34 media_hub,
35 room_id: room_id.into(),
36 peer_id: peer_id.into(),
37 tx,
38 rx,
39 tracking: Default::default(),
40 }
41 }
42}
43
44#[async_trait::async_trait]
45impl ClusterEndpoint for PeerLocal {
46 fn on_event(&mut self, event: ClusterEndpointOutgoingEvent) -> Result<(), ClusterEndpointError> {
47 match event {
48 ClusterEndpointOutgoingEvent::SubscribeRoom => {
49 self.tracking.add("sub-room");
50 self.event_hub.write().subscribe_room(&self.room_id, self.peer_id_hash as u32, self.tx.clone());
51 }
52 ClusterEndpointOutgoingEvent::UnsubscribeRoom => {
53 self.tracking.remove("sub-room");
54 self.event_hub.write().unsubscribe_room(&self.room_id, self.peer_id_hash as u32);
55 }
56 ClusterEndpointOutgoingEvent::SubscribePeer(peer_id) => {
57 self.tracking.add2("sub-peer", &peer_id);
58 self.event_hub.write().subscribe_peer(&self.room_id, &peer_id, self.peer_id_hash as u32, self.tx.clone());
59 }
60 ClusterEndpointOutgoingEvent::UnsubscribePeer(peer_id) => {
61 self.tracking.remove2("sub-peer", &peer_id);
62 self.event_hub.write().unsubscribe_peer(&self.room_id, &peer_id, self.peer_id_hash as u32);
63 }
64 ClusterEndpointOutgoingEvent::LocalTrackEvent(track_id, event) => match event {
65 ClusterLocalTrackOutgoingEvent::Subscribe(peer_id, track_name) => {
66 self.tracking.add3("sub-track", &peer_id, &track_name);
67 let consumer_id = self.peer_id_hash | track_id as u64;
68 let track_uuid = generate_cluster_track_uuid(&self.room_id, &peer_id, &track_name);
69 self.media_hub.write().subscribe(track_uuid, consumer_id, self.tx.clone());
70 }
71 ClusterLocalTrackOutgoingEvent::Unsubscribe(peer_id, track_name) => {
72 self.tracking.remove3("sub-track", &peer_id, &track_name);
73 let consumer_id = self.peer_id_hash | track_id as u64;
74 let track_uuid = generate_cluster_track_uuid(&self.room_id, &peer_id, &track_name);
75 self.media_hub.write().unsubscribe(track_uuid, consumer_id);
76 }
77 ClusterLocalTrackOutgoingEvent::RequestKeyFrame(kind) => {
78 let consumer_id = self.peer_id_hash | track_id as u64;
79 self.media_hub.read().forward(consumer_id, ClusterRemoteTrackIncomingEvent::RequestKeyFrame(kind));
80 }
81 ClusterLocalTrackOutgoingEvent::LimitBitrate(bitrate) => {
82 }
84 },
85 ClusterEndpointOutgoingEvent::RemoteTrackEvent(track_id, cluster_track_uuid, event) => match event {
86 ClusterRemoteTrackOutgoingEvent::TrackAdded(track_name, track_meta) => {
87 self.tracking.add2("track", &track_name);
88 let track_uuid = generate_cluster_track_uuid(&self.room_id, &self.peer_id, &track_name);
89 self.event_hub.write().add_track(&self.room_id, &self.peer_id, &track_name, track_meta);
90 self.media_hub.write().add_track(track_uuid, track_id, self.tx.clone());
91 }
92 ClusterRemoteTrackOutgoingEvent::TrackMedia(pkt) => {
93 self.media_hub.write().relay(cluster_track_uuid, pkt);
94 }
95 ClusterRemoteTrackOutgoingEvent::TrackStats(stats) => {
96 self.media_hub.write().relay_stats(cluster_track_uuid, stats);
97 }
98 ClusterRemoteTrackOutgoingEvent::TrackRemoved(track_name) => {
99 self.tracking.remove2("track", &track_name);
100 let track_uuid = generate_cluster_track_uuid(&self.room_id, &self.peer_id, &track_name);
101 self.event_hub.write().remove_track(&self.room_id, &self.peer_id, &track_name);
102 self.media_hub.write().remove_track(track_uuid);
103 }
104 },
105 }
106 Ok(())
107 }
108
109 async fn recv(&mut self) -> Result<ClusterEndpointIncomingEvent, ClusterEndpointError> {
110 self.rx.recv().await.map_err(|_| ClusterEndpointError::InternalError)
111 }
112}
113
114impl Drop for PeerLocal {
115 fn drop(&mut self) {
116 log::debug!("[PeerLocal {}/{}] drop", self.room_id, self.peer_id);
117 if !self.tracking.is_empty() {
118 log::error!("PeerLocal {}-{} tracking not empty: {}", self.room_id, self.peer_id, self.tracking.dump());
119 }
120 assert!(self.tracking.is_empty());
121 }
122}
123
124pub struct ServerLocal {
125 event_hub: Arc<RwLock<LocalEventHub>>,
126 media_hub: Arc<RwLock<LocalMediaHub>>,
127}
128
129impl ServerLocal {
130 pub fn new() -> Self {
131 Self {
132 event_hub: Arc::new(RwLock::new(LocalEventHub::default())),
133 media_hub: Arc::new(RwLock::new(LocalMediaHub::default())),
134 }
135 }
136}
137
138impl Cluster<PeerLocal> for ServerLocal {
139 fn build(&mut self, room_id: &str, peer_id: &str) -> PeerLocal {
140 PeerLocal::new(self.event_hub.clone(), self.media_hub.clone(), room_id, peer_id)
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use std::time::Duration;
147
148 use async_std::prelude::FutureExt;
149 use cluster::{generate_cluster_track_uuid, Cluster, ClusterEndpoint, ClusterEndpointIncomingEvent, ClusterRemoteTrackOutgoingEvent, ClusterTrackMeta};
150
151 #[async_std::test]
152 async fn subscribe_room() {
153 let mut server_local = super::ServerLocal::new();
154 let mut peer1 = server_local.build("room", "peer1");
155 let mut peer2 = server_local.build("room", "peer2");
156 let cluster_track_uuid = generate_cluster_track_uuid("room", "peer2", "audio_main");
157
158 peer1.on_event(cluster::ClusterEndpointOutgoingEvent::SubscribeRoom).unwrap();
159 let meta = ClusterTrackMeta::default_audio();
160 peer2
161 .on_event(cluster::ClusterEndpointOutgoingEvent::RemoteTrackEvent(
162 1,
163 cluster_track_uuid,
164 ClusterRemoteTrackOutgoingEvent::TrackAdded("audio_main".to_string(), meta.clone()),
165 ))
166 .unwrap();
167
168 assert_eq!(
169 peer1.recv().timeout(Duration::from_secs(1)).await,
170 Ok(Ok(ClusterEndpointIncomingEvent::PeerTrackAdded("peer2".to_string(), "audio_main".to_string(), meta.clone())))
171 );
172
173 peer2
174 .on_event(cluster::ClusterEndpointOutgoingEvent::RemoteTrackEvent(
175 1,
176 cluster_track_uuid,
177 ClusterRemoteTrackOutgoingEvent::TrackRemoved("audio_main".to_string()),
178 ))
179 .unwrap();
180
181 assert_eq!(
182 peer1.recv().timeout(Duration::from_secs(1)).await,
183 Ok(Ok(ClusterEndpointIncomingEvent::PeerTrackRemoved("peer2".to_string(), "audio_main".to_string())))
184 );
185
186 peer1.on_event(cluster::ClusterEndpointOutgoingEvent::UnsubscribeRoom).unwrap();
187 }
188}