1use std::collections::HashMap;
2
3use crate::{
4 implement::types::to_room_streams_map_key,
5 rpc::{connector::MediaEndpointLogResponse, RPC_MEDIA_ENDPOINT_LOG},
6 ClusterEndpoint, ClusterEndpointError, ClusterEndpointIncomingEvent, ClusterEndpointOutgoingEvent, ClusterLocalTrackIncomingEvent, ClusterLocalTrackOutgoingEvent, ClusterRemoteTrackIncomingEvent,
7 ClusterRemoteTrackOutgoingEvent, ClusterTrackUuid, CONNECTOR_SERVICE,
8};
9use async_std::channel::{bounded, Receiver, Sender};
10use atm0s_sdn::{
11 ChannelUuid, ConsumerRaw, Feedback, FeedbackType, KeyId, KeySource, KeyValueSdk, KeyVersion, LocalSubId, NodeId, NumberInfo, PublisherRaw, PubsubSdk, RouteRule, RpcEmitter, SubKeyId, ValueType,
12};
13use bytes::Bytes;
14use futures::{select, FutureExt};
15use media_utils::{hash_str, ErrorDebugger};
16use transport::RequestKeyframeKind;
17
18use super::types::{from_peer_value, from_stream_value, to_peer_sub_key, to_peer_value, to_room_peers_map_key, to_stream_sub_key, to_stream_value, TrackData};
19
20#[repr(u8)]
21enum TrackFeedbackType {
22 RequestKeyFrame = 0,
23 LimitBitrate = 1,
24}
25
26impl TryFrom<u8> for TrackFeedbackType {
27 type Error = ();
28
29 fn try_from(value: u8) -> Result<Self, Self::Error> {
30 match value {
31 0 => Ok(TrackFeedbackType::RequestKeyFrame),
32 1 => Ok(TrackFeedbackType::LimitBitrate),
33 _ => Err(()),
34 }
35 }
36}
37
38pub struct ClusterEndpointSdn {
39 room_id: String,
40 peer_id: String,
41 room_peers_map_key: u64,
42 room_streams_map_key: u64,
43 sub_uuid: u64,
44 pubsub_sdk: PubsubSdk,
45 kv_sdk: KeyValueSdk,
46 kv_peers_tx: Sender<(KeyId, SubKeyId, Option<ValueType>, KeyVersion, KeySource)>,
47 kv_peers_rx: Receiver<(KeyId, SubKeyId, Option<ValueType>, KeyVersion, KeySource)>,
48 kv_streams_tx: Sender<(KeyId, SubKeyId, Option<ValueType>, KeyVersion, KeySource)>,
49 kv_streams_rx: Receiver<(KeyId, SubKeyId, Option<ValueType>, KeyVersion, KeySource)>,
50 data_tx: Sender<(LocalSubId, NodeId, ChannelUuid, Bytes)>,
51 data_rx: Receiver<(LocalSubId, NodeId, ChannelUuid, Bytes)>,
52 data_fb_tx: Sender<Feedback>,
53 data_fb_rx: Receiver<Feedback>,
54 consumer_map: HashMap<u64, u16>,
55 track_sub_map: HashMap<u16, HashMap<ClusterTrackUuid, ConsumerRaw>>,
56 room_peers_sub: Option<()>,
57 room_streams_sub: Option<()>,
58 peer_sub: HashMap<String, ()>,
59 track_pub: HashMap<ChannelUuid, (u16, PublisherRaw)>,
60 remote_peer_cached: HashMap<u64, String>,
61 remote_track_cached: HashMap<u64, (String, String)>,
62 rpc_emitter: RpcEmitter,
63}
64
65impl ClusterEndpointSdn {
66 pub(crate) fn new(room_id: &str, peer_id: &str, pubsub_sdk: PubsubSdk, kv_sdk: KeyValueSdk, rpc_emitter: RpcEmitter) -> Self {
67 let (kv_peers_tx, kv_peers_rx) = bounded(100);
68 let (kv_streams_tx, kv_streams_rx) = bounded(100);
69 let (data_tx, data_rx) = bounded(1000);
70 let (data_fb_tx, data_fb_rx) = bounded(100);
71 let room_streams_map_key = to_room_streams_map_key(room_id);
72 let room_peers_map_key = to_room_peers_map_key(room_id);
73 log::info!(
74 "[Atm0sClusterEndpoint] create endpoint {}/{} room_peers_key {}, room_streams_key {}",
75 room_id,
76 peer_id,
77 room_streams_map_key,
78 room_peers_map_key
79 );
80
81 Self {
82 room_id: room_id.to_string(),
83 peer_id: peer_id.to_string(),
84 room_streams_map_key,
85 room_peers_map_key,
86 sub_uuid: hash_str(&format!("{}/{}", room_id, peer_id)),
87 pubsub_sdk,
88 kv_sdk,
89 kv_peers_tx,
90 kv_peers_rx,
91 kv_streams_tx,
92 kv_streams_rx,
93 data_tx,
94 data_rx,
95 data_fb_tx,
96 data_fb_rx,
97 track_sub_map: Default::default(),
98 consumer_map: Default::default(),
99 room_peers_sub: None,
100 room_streams_sub: None,
101 peer_sub: Default::default(),
102 track_pub: Default::default(),
103 remote_peer_cached: Default::default(),
104 remote_track_cached: Default::default(),
105 rpc_emitter,
106 }
107 }
108
109 fn peer_key(&self, peer_id: &str) -> u64 {
110 hash_str(&format!("{}/{}", self.room_id, peer_id))
111 }
112}
113
114#[async_trait::async_trait]
115impl ClusterEndpoint for ClusterEndpointSdn {
116 fn on_event(&mut self, event: ClusterEndpointOutgoingEvent) -> Result<(), ClusterEndpointError> {
117 match event {
118 ClusterEndpointOutgoingEvent::InfoSet(meta) => {
119 let (sub_key, value) = to_peer_value(&self.peer_id, meta);
120 self.kv_sdk.hset(self.room_peers_map_key, sub_key, value.clone(), Some(10000));
121 Ok(())
122 }
123 ClusterEndpointOutgoingEvent::InfoUpdate(meta) => {
124 let (sub_key, value) = to_peer_value(&self.peer_id, meta);
125 self.kv_sdk.hset(self.room_peers_map_key, sub_key, value.clone(), Some(10000));
126 Ok(())
127 }
128 ClusterEndpointOutgoingEvent::InfoRemove => {
129 self.kv_sdk.hdel(self.room_peers_map_key, to_peer_sub_key(&self.peer_id));
130 Ok(())
131 }
132 ClusterEndpointOutgoingEvent::SubscribeRoomPeers => {
133 if self.room_peers_sub.is_none() {
134 log::info!("[Atm0sClusterEndpoint] sub room peers {}", self.room_peers_map_key);
135 self.kv_sdk.hsubscribe_raw(self.room_peers_map_key, self.sub_uuid, Some(10000), self.kv_peers_tx.clone());
136 self.room_peers_sub = Some(());
137 } else {
138 log::warn!("[Atm0sClusterEndpoint] sub room peers but already exist");
139 }
140 Ok(())
141 }
142 ClusterEndpointOutgoingEvent::UnsubscribeRoomPeers => {
143 if self.room_peers_sub.take().is_some() {
144 log::info!("[Atm0sClusterEndpoint] unsub room peers {}", self.room_peers_map_key);
145 self.kv_sdk.hunsubscribe_raw(self.room_peers_map_key, self.sub_uuid);
146 } else {
147 log::warn!("[Atm0sClusterEndpoint] unsub room peers but not found");
148 }
149 Ok(())
150 }
151 ClusterEndpointOutgoingEvent::SubscribeRoomStreams => {
152 if self.peer_sub.is_empty() && self.room_streams_sub.is_none() {
153 log::info!("[Atm0sClusterEndpoint] sub room streams {}", self.room_streams_map_key);
154 self.kv_sdk.hsubscribe_raw(self.room_streams_map_key, self.sub_uuid, Some(10000), self.kv_streams_tx.clone());
155 self.room_streams_sub = Some(());
156 } else {
157 log::warn!("[Atm0sClusterEndpoint] sub room but already exist");
158 }
159 Ok(())
160 }
161 ClusterEndpointOutgoingEvent::UnsubscribeRoomStreams => {
162 if self.peer_sub.is_empty() && self.room_streams_sub.take().is_some() {
163 log::info!("[Atm0sClusterEndpoint] unsub room streams {}", self.room_streams_map_key);
164 self.kv_sdk.hunsubscribe_raw(self.room_streams_map_key, self.sub_uuid);
165 } else {
166 log::warn!("[Atm0sClusterEndpoint] unsub room but not found");
167 }
168 Ok(())
169 }
170 ClusterEndpointOutgoingEvent::SubscribeSinglePeer(peer_id) => {
171 if self.room_streams_sub.is_none() && !self.peer_sub.contains_key(&peer_id) {
172 log::warn!("[Atm0sClusterEndpoint] sub peer streams {}, key {}", peer_id, self.peer_key(&peer_id));
173 self.kv_sdk.hsubscribe_raw(self.peer_key(&peer_id), self.sub_uuid, Some(10000), self.kv_streams_tx.clone());
174 self.peer_sub.insert(peer_id, ());
175 } else {
176 log::warn!("[Atm0sClusterEndpoint] sub peer but already exist {peer_id}");
177 }
178 Ok(())
179 }
180 ClusterEndpointOutgoingEvent::UnsubscribeSinglePeer(peer_id) => {
181 if self.room_streams_sub.is_none() && self.peer_sub.remove(&peer_id).is_some() {
182 log::warn!("[Atm0sClusterEndpoint] unsub peer streams {}, key {}", peer_id, self.peer_key(&peer_id));
183 self.kv_sdk.hunsubscribe_raw(self.peer_key(&peer_id), self.sub_uuid);
184 } else {
185 log::warn!("[Atm0sClusterEndpoint] unsub peer but not found {peer_id}");
186 }
187 Ok(())
188 }
189 ClusterEndpointOutgoingEvent::LocalTrackEvent(track_id, event) => match event {
190 ClusterLocalTrackOutgoingEvent::RequestKeyFrame(kind) => {
191 let value = match kind {
192 RequestKeyframeKind::Fir => 1,
193 RequestKeyframeKind::Pli => 2,
194 } as i64;
195 if let Some(consumers) = self.track_sub_map.get(&track_id) {
196 for (_, consumer) in consumers {
197 log::debug!("[Atm0sClusterEndpoint] send track feedback RequestKeyFrame {track_id} => {:?}", consumer.uuid());
198 consumer.feedback(
199 TrackFeedbackType::RequestKeyFrame as u8,
200 FeedbackType::Number {
201 window_ms: 200,
202 info: NumberInfo {
203 count: 1,
204 sum: value,
205 max: value,
206 min: value,
207 },
208 },
209 )
210 }
211 } else {
212 log::warn!("[Atm0sClusterEndpoint] send track feedback RequestKeyFrame but track not found {track_id}");
213 }
214 Ok(())
215 }
216 ClusterLocalTrackOutgoingEvent::LimitBitrate(bitrate) => {
217 if let Some(consumers) = self.track_sub_map.get(&track_id) {
218 for (_, consumer) in consumers {
219 log::debug!("[Atm0sClusterEndpoint] send track feedback LimitBitrate({bitrate}) {track_id} => {:?}", consumer.uuid());
220 consumer.feedback(
221 TrackFeedbackType::LimitBitrate as u8,
222 FeedbackType::Number {
223 window_ms: 200,
224 info: NumberInfo {
225 count: 1,
226 sum: bitrate as i64,
227 max: bitrate as i64,
228 min: bitrate as i64,
229 },
230 },
231 )
232 }
233 } else {
234 log::warn!("[Atm0sClusterEndpoint] send track feedback LimitBitrate({bitrate}) but track not found {track_id}");
235 }
236 Ok(())
237 }
238 ClusterLocalTrackOutgoingEvent::Subscribe(peer_id, track_name) => {
239 let track_uuid = ClusterTrackUuid::from_info(&self.room_id, &peer_id, &track_name);
240 let consumer = self.pubsub_sdk.create_consumer_raw(*track_uuid as ChannelUuid, self.data_tx.clone());
241 log::info!("[Atm0sClusterEndpoint] sub track {peer_id} {track_name} => track_uuid {} consumer_id {}", *track_uuid, consumer.uuid());
242 self.consumer_map.insert(consumer.uuid(), track_id);
243 let entry = self.track_sub_map.entry(track_id).or_insert_with(Default::default);
244 entry.insert(track_uuid, consumer);
245 Ok(())
246 }
247 ClusterLocalTrackOutgoingEvent::Unsubscribe(peer_id, track_name) => {
248 let track_uuid = ClusterTrackUuid::from_info(&self.room_id, &peer_id, &track_name);
249 if let Some(consumers) = self.track_sub_map.get_mut(&track_id) {
250 if let Some(consumer) = consumers.remove(&track_uuid) {
251 log::info!(
252 "[Atm0sClusterEndpoint] unsub track {peer_id} {track_name} => track_uuid {} consumer_id {}",
253 *track_uuid,
254 consumer.uuid()
255 );
256 self.consumer_map.remove(&consumer.uuid());
257 if consumers.is_empty() {
258 self.track_sub_map.remove(&track_id);
259 }
260 } else {
261 log::warn!("[Atm0sClusterEndpoint] unsub track but not found {peer_id} {track_name} => track_uuid {}", *track_uuid);
262 }
263 } else {
264 log::warn!("[Atm0sClusterEndpoint] unsub track but not found {peer_id} {track_name} => track_uuid {}", *track_uuid);
265 }
266 Ok(())
267 }
268 },
269 ClusterEndpointOutgoingEvent::RemoteTrackEvent(track_id, track_uuid, event) => {
270 let channel_uuid = *track_uuid;
271 match event {
272 ClusterRemoteTrackOutgoingEvent::TrackAdded(track_name, track_meta) => {
273 if !self.track_pub.contains_key(&channel_uuid) {
274 let (sub_key, value) = to_stream_value(&self.peer_id, &track_name, track_meta);
275 self.track_pub
276 .insert(channel_uuid, (track_id, self.pubsub_sdk.create_publisher_raw(channel_uuid, self.data_fb_tx.clone())));
277
278 self.kv_sdk.hset(self.room_streams_map_key, sub_key, value.clone(), Some(10000));
280 self.kv_sdk.hset(self.peer_key(&self.peer_id), sub_key, value, Some(10000));
282 log::info!(
283 "[Atm0sClusterEndpoint] add track {} {track_name} => track_uuid {channel_uuid} track_id {track_id}, set sub-key in hash {} and {}",
284 self.peer_id,
285 self.room_streams_map_key,
286 self.peer_key(&self.peer_id)
287 );
288 } else {
289 log::warn!(
290 "[Atm0sClusterEndpoint] add track but already exist {} {track_name} => track_uuid {channel_uuid} track_id {track_id}",
291 self.peer_id
292 );
293 }
294 Ok(())
295 }
296 ClusterRemoteTrackOutgoingEvent::TrackMedia(media_packet) => {
297 if let Some((_, publisher)) = self.track_pub.get(&channel_uuid) {
298 if let Ok(buf) = TrackData::Media(media_packet).try_into() {
299 publisher.send(buf);
300 }
301 } else {
302 log::warn!("[Atm0sClusterEndpoint] send track media but track not found {}", channel_uuid);
303 }
304 Ok(())
305 }
306 ClusterRemoteTrackOutgoingEvent::TrackStats(stats) => {
307 if let Some((_, publisher)) = self.track_pub.get(&channel_uuid) {
308 if let Ok(buf) = TrackData::Stats(stats).try_into() {
309 publisher.send(buf);
310 }
311 } else {
312 log::warn!("[Atm0sClusterEndpoint] send track stats but track not found {}", channel_uuid);
313 }
314 Ok(())
315 }
316 ClusterRemoteTrackOutgoingEvent::TrackRemoved(track_name) => {
317 if self.track_pub.remove(&channel_uuid).is_some() {
318 let sub_key = to_stream_sub_key(&self.peer_id, &track_name);
319
320 self.kv_sdk.hdel(self.room_streams_map_key, sub_key);
322
323 self.kv_sdk.hdel(self.peer_key(&self.peer_id), sub_key);
325 log::info!(
326 "[Atm0sClusterEndpoint] delete track {} {track_name} => track_uuid {channel_uuid} track_id {track_id}, del sub-key in hash {} and {}",
327 self.peer_id,
328 self.room_streams_map_key,
329 self.peer_key(&self.peer_id)
330 );
331 } else {
332 log::warn!(
333 "[Atm0sClusterEndpoint] delete track but not found {} {track_name} => track_uuid {channel_uuid} track_id {track_id}",
334 self.peer_id
335 );
336 }
337 Ok(())
338 }
339 }
340 }
341 ClusterEndpointOutgoingEvent::MediaEndpointLog(event) => {
342 log::info!("[Atm0sClusterEndpoint] log event {:?}", event);
343 let emitter = self.rpc_emitter.clone();
344 async_std::task::spawn_local(async move {
345 emitter
346 .request::<_, MediaEndpointLogResponse>(CONNECTOR_SERVICE, RouteRule::ToService(0), RPC_MEDIA_ENDPOINT_LOG, event, 5000)
347 .await
348 .log_error("Should send event to Connector Service");
349 });
350 Ok(())
351 }
352 }
353 }
354
355 async fn recv(&mut self) -> Result<ClusterEndpointIncomingEvent, ClusterEndpointError> {
356 loop {
357 select! {
358 event = self.kv_peers_rx.recv().fuse() => match event {
359 Ok((_key, sub_key, value, _ver, _source)) => {
360 if sub_key == to_peer_sub_key(&self.peer_id) { continue;
362 }
363 if let Some(value) = value { if let Some((peer, meta)) = from_peer_value(sub_key, &value) {
365 if self.remote_peer_cached.insert(sub_key, peer.clone()).is_some() {
366 log::info!("[Atm0sClusterEndpoint] on room update peer {}", peer);
367 return Ok(ClusterEndpointIncomingEvent::PeerUpdated(peer, meta));
368 } else {
369 log::info!("[Atm0sClusterEndpoint] on room add peer {}", peer);
370 return Ok(ClusterEndpointIncomingEvent::PeerAdded(peer, meta));
371 }
372 }
373 } else { if let Some(peer) = self.remote_peer_cached.remove(&sub_key) {
375 log::info!("[Atm0sClusterEndpoint] on room remove peer {}", peer);
376 return Ok(ClusterEndpointIncomingEvent::PeerRemoved(peer));
377 }
378 }
379 }
380 Err(_e) => {
381 return Err(ClusterEndpointError::InternalError);
382 }
383 },
384 event = self.kv_streams_rx.recv().fuse() => match event {
385 Ok((_key, sub_key, value, _ver, _source)) => {
386 if let Some(value) = value { if let Some((peer, track, meta)) = from_stream_value(sub_key, &value) {
388 if self.remote_track_cached.insert(sub_key, (peer.clone(), track.clone())).is_some() {
389 log::info!("[Atm0sClusterEndpoint] on room update stream {} {}", peer, track);
390 return Ok(ClusterEndpointIncomingEvent::PeerTrackUpdated(peer, track, meta));
391 } else {
392 log::info!("[Atm0sClusterEndpoint] on room add stream {} {}", peer, track);
393 return Ok(ClusterEndpointIncomingEvent::PeerTrackAdded(peer, track, meta));
394 }
395 }
396 } else { if let Some((peer, track)) = self.remote_track_cached.remove(&sub_key) {
398 log::info!("[Atm0sClusterEndpoint] on room remove {} {}", peer, track);
399 return Ok(ClusterEndpointIncomingEvent::PeerTrackRemoved(peer, track));
400 }
401 }
402 }
403 Err(_e) => {
404 return Err(ClusterEndpointError::InternalError);
405 }
406 },
407 event = self.data_fb_rx.recv().fuse() => match event {
408 Ok(fb) => {
409 if let Some((track_id, _)) = self.track_pub.get(&fb.channel.uuid()) {
410 log::trace!("[Atm0sClusterEndpoint] recv track feedback {track_id} => {:?}", fb);
411 match (TrackFeedbackType::try_from(fb.id), fb.feedback_type) {
412 (Ok(TrackFeedbackType::LimitBitrate), FeedbackType::Number { window_ms: _, info }) => {
413 return Ok(ClusterEndpointIncomingEvent::RemoteTrackEvent(*track_id, ClusterRemoteTrackIncomingEvent::RequestLimitBitrate(info.max as u32)));
414 },
415 (Ok(TrackFeedbackType::RequestKeyFrame), FeedbackType::Number { window_ms: _, info }) => {
416 let kind = if info.sum > info.count as i64 { transport::RequestKeyframeKind::Pli
418 } else {
419 transport::RequestKeyframeKind::Fir
420 };
421 return Ok(ClusterEndpointIncomingEvent::RemoteTrackEvent(*track_id, ClusterRemoteTrackIncomingEvent::RequestKeyFrame(kind)));
422 },
423 _ => {}
424 }
425 } else {
426 log::warn!("[Atm0sClusterEndpoint] recv track feedback but track not found {}", fb.channel.uuid());
427 }
428 },
429 Err(_e) => {
430 return Err(ClusterEndpointError::InternalError);
431 }
432 },
433 event = self.data_rx.recv().fuse() => match event {
434 Ok((sub_id, _node_id, channel_uuid, data)) => {
435 if let Some(track_id) = self.consumer_map.get(&sub_id) {
436 log::trace!("[Atm0sClusterEndpoint] recv track data {sub_id} => {track_id}");
437 match TrackData::try_from(data) {
438 Ok(TrackData::Media(media_packet)) => {
439 return Ok(ClusterEndpointIncomingEvent::LocalTrackEvent(*track_id, ClusterLocalTrackIncomingEvent::MediaPacket(channel_uuid.into(), media_packet)));
440 },
441 Ok(TrackData::Stats(stats)) => {
442 return Ok(ClusterEndpointIncomingEvent::LocalTrackEvent(*track_id, ClusterLocalTrackIncomingEvent::MediaStats(channel_uuid.into(), stats)));
443 },
444 Err(_e) => {
445
446 }
447 }
448 } else {
449 log::warn!("[Atm0sClusterEndpoint] recv track data but track not found {}", sub_id);
450 }
451 },
452 Err(_e) => {
453 return Err(ClusterEndpointError::InternalError);
454 }
455 }
456 }
457 }
458 }
459}
460
461impl Drop for ClusterEndpointSdn {
462 fn drop(&mut self) {
463 assert_eq!(self.consumer_map.len(), 0);
464 assert_eq!(self.track_sub_map.len(), 0);
465 assert_eq!(self.peer_sub.len(), 0);
466 assert_eq!(self.track_pub.len(), 0);
467 assert_eq!(self.room_streams_sub, None);
468 }
469}
470
471