atm0s_media_server_cluster/implement/
endpoint.rs

1use std::collections::HashMap;
2
3use crate::{
4    implement::types::to_room_streams_map_key,
5    rpc::{connector::MediaEndpointLogResponse, RPC_MEDIA_ENDPOINT_LOG},
6    ClusterEndpoint, ClusterEndpointError, ClusterEndpointIncomingEvent, ClusterEndpointOutgoingEvent, ClusterLocalTrackIncomingEvent, ClusterLocalTrackOutgoingEvent, ClusterRemoteTrackIncomingEvent,
7    ClusterRemoteTrackOutgoingEvent, ClusterTrackUuid, CONNECTOR_SERVICE,
8};
9use async_std::channel::{bounded, Receiver, Sender};
10use atm0s_sdn::{
11    ChannelUuid, ConsumerRaw, Feedback, FeedbackType, KeyId, KeySource, KeyValueSdk, KeyVersion, LocalSubId, NodeId, NumberInfo, PublisherRaw, PubsubSdk, RouteRule, RpcEmitter, SubKeyId, ValueType,
12};
13use bytes::Bytes;
14use futures::{select, FutureExt};
15use media_utils::{hash_str, ErrorDebugger};
16use transport::RequestKeyframeKind;
17
18use super::types::{from_peer_value, from_stream_value, to_peer_sub_key, to_peer_value, to_room_peers_map_key, to_stream_sub_key, to_stream_value, TrackData};
19
20#[repr(u8)]
21enum TrackFeedbackType {
22    RequestKeyFrame = 0,
23    LimitBitrate = 1,
24}
25
26impl TryFrom<u8> for TrackFeedbackType {
27    type Error = ();
28
29    fn try_from(value: u8) -> Result<Self, Self::Error> {
30        match value {
31            0 => Ok(TrackFeedbackType::RequestKeyFrame),
32            1 => Ok(TrackFeedbackType::LimitBitrate),
33            _ => Err(()),
34        }
35    }
36}
37
38pub struct ClusterEndpointSdn {
39    room_id: String,
40    peer_id: String,
41    room_peers_map_key: u64,
42    room_streams_map_key: u64,
43    sub_uuid: u64,
44    pubsub_sdk: PubsubSdk,
45    kv_sdk: KeyValueSdk,
46    kv_peers_tx: Sender<(KeyId, SubKeyId, Option<ValueType>, KeyVersion, KeySource)>,
47    kv_peers_rx: Receiver<(KeyId, SubKeyId, Option<ValueType>, KeyVersion, KeySource)>,
48    kv_streams_tx: Sender<(KeyId, SubKeyId, Option<ValueType>, KeyVersion, KeySource)>,
49    kv_streams_rx: Receiver<(KeyId, SubKeyId, Option<ValueType>, KeyVersion, KeySource)>,
50    data_tx: Sender<(LocalSubId, NodeId, ChannelUuid, Bytes)>,
51    data_rx: Receiver<(LocalSubId, NodeId, ChannelUuid, Bytes)>,
52    data_fb_tx: Sender<Feedback>,
53    data_fb_rx: Receiver<Feedback>,
54    consumer_map: HashMap<u64, u16>,
55    track_sub_map: HashMap<u16, HashMap<ClusterTrackUuid, ConsumerRaw>>,
56    room_peers_sub: Option<()>,
57    room_streams_sub: Option<()>,
58    peer_sub: HashMap<String, ()>,
59    track_pub: HashMap<ChannelUuid, (u16, PublisherRaw)>,
60    remote_peer_cached: HashMap<u64, String>,
61    remote_track_cached: HashMap<u64, (String, String)>,
62    rpc_emitter: RpcEmitter,
63}
64
65impl ClusterEndpointSdn {
66    pub(crate) fn new(room_id: &str, peer_id: &str, pubsub_sdk: PubsubSdk, kv_sdk: KeyValueSdk, rpc_emitter: RpcEmitter) -> Self {
67        let (kv_peers_tx, kv_peers_rx) = bounded(100);
68        let (kv_streams_tx, kv_streams_rx) = bounded(100);
69        let (data_tx, data_rx) = bounded(1000);
70        let (data_fb_tx, data_fb_rx) = bounded(100);
71        let room_streams_map_key = to_room_streams_map_key(room_id);
72        let room_peers_map_key = to_room_peers_map_key(room_id);
73        log::info!(
74            "[Atm0sClusterEndpoint] create endpoint {}/{} room_peers_key {}, room_streams_key {}",
75            room_id,
76            peer_id,
77            room_streams_map_key,
78            room_peers_map_key
79        );
80
81        Self {
82            room_id: room_id.to_string(),
83            peer_id: peer_id.to_string(),
84            room_streams_map_key,
85            room_peers_map_key,
86            sub_uuid: hash_str(&format!("{}/{}", room_id, peer_id)),
87            pubsub_sdk,
88            kv_sdk,
89            kv_peers_tx,
90            kv_peers_rx,
91            kv_streams_tx,
92            kv_streams_rx,
93            data_tx,
94            data_rx,
95            data_fb_tx,
96            data_fb_rx,
97            track_sub_map: Default::default(),
98            consumer_map: Default::default(),
99            room_peers_sub: None,
100            room_streams_sub: None,
101            peer_sub: Default::default(),
102            track_pub: Default::default(),
103            remote_peer_cached: Default::default(),
104            remote_track_cached: Default::default(),
105            rpc_emitter,
106        }
107    }
108
109    fn peer_key(&self, peer_id: &str) -> u64 {
110        hash_str(&format!("{}/{}", self.room_id, peer_id))
111    }
112}
113
114#[async_trait::async_trait]
115impl ClusterEndpoint for ClusterEndpointSdn {
116    fn on_event(&mut self, event: ClusterEndpointOutgoingEvent) -> Result<(), ClusterEndpointError> {
117        match event {
118            ClusterEndpointOutgoingEvent::InfoSet(meta) => {
119                let (sub_key, value) = to_peer_value(&self.peer_id, meta);
120                self.kv_sdk.hset(self.room_peers_map_key, sub_key, value.clone(), Some(10000));
121                Ok(())
122            }
123            ClusterEndpointOutgoingEvent::InfoUpdate(meta) => {
124                let (sub_key, value) = to_peer_value(&self.peer_id, meta);
125                self.kv_sdk.hset(self.room_peers_map_key, sub_key, value.clone(), Some(10000));
126                Ok(())
127            }
128            ClusterEndpointOutgoingEvent::InfoRemove => {
129                self.kv_sdk.hdel(self.room_peers_map_key, to_peer_sub_key(&self.peer_id));
130                Ok(())
131            }
132            ClusterEndpointOutgoingEvent::SubscribeRoomPeers => {
133                if self.room_peers_sub.is_none() {
134                    log::info!("[Atm0sClusterEndpoint] sub room peers {}", self.room_peers_map_key);
135                    self.kv_sdk.hsubscribe_raw(self.room_peers_map_key, self.sub_uuid, Some(10000), self.kv_peers_tx.clone());
136                    self.room_peers_sub = Some(());
137                } else {
138                    log::warn!("[Atm0sClusterEndpoint] sub room peers but already exist");
139                }
140                Ok(())
141            }
142            ClusterEndpointOutgoingEvent::UnsubscribeRoomPeers => {
143                if self.room_peers_sub.take().is_some() {
144                    log::info!("[Atm0sClusterEndpoint] unsub room peers {}", self.room_peers_map_key);
145                    self.kv_sdk.hunsubscribe_raw(self.room_peers_map_key, self.sub_uuid);
146                } else {
147                    log::warn!("[Atm0sClusterEndpoint] unsub room peers but not found");
148                }
149                Ok(())
150            }
151            ClusterEndpointOutgoingEvent::SubscribeRoomStreams => {
152                if self.peer_sub.is_empty() && self.room_streams_sub.is_none() {
153                    log::info!("[Atm0sClusterEndpoint] sub room streams {}", self.room_streams_map_key);
154                    self.kv_sdk.hsubscribe_raw(self.room_streams_map_key, self.sub_uuid, Some(10000), self.kv_streams_tx.clone());
155                    self.room_streams_sub = Some(());
156                } else {
157                    log::warn!("[Atm0sClusterEndpoint] sub room but already exist");
158                }
159                Ok(())
160            }
161            ClusterEndpointOutgoingEvent::UnsubscribeRoomStreams => {
162                if self.peer_sub.is_empty() && self.room_streams_sub.take().is_some() {
163                    log::info!("[Atm0sClusterEndpoint] unsub room streams {}", self.room_streams_map_key);
164                    self.kv_sdk.hunsubscribe_raw(self.room_streams_map_key, self.sub_uuid);
165                } else {
166                    log::warn!("[Atm0sClusterEndpoint] unsub room but not found");
167                }
168                Ok(())
169            }
170            ClusterEndpointOutgoingEvent::SubscribeSinglePeer(peer_id) => {
171                if self.room_streams_sub.is_none() && !self.peer_sub.contains_key(&peer_id) {
172                    log::warn!("[Atm0sClusterEndpoint] sub peer streams {}, key {}", peer_id, self.peer_key(&peer_id));
173                    self.kv_sdk.hsubscribe_raw(self.peer_key(&peer_id), self.sub_uuid, Some(10000), self.kv_streams_tx.clone());
174                    self.peer_sub.insert(peer_id, ());
175                } else {
176                    log::warn!("[Atm0sClusterEndpoint] sub peer but already exist {peer_id}");
177                }
178                Ok(())
179            }
180            ClusterEndpointOutgoingEvent::UnsubscribeSinglePeer(peer_id) => {
181                if self.room_streams_sub.is_none() && self.peer_sub.remove(&peer_id).is_some() {
182                    log::warn!("[Atm0sClusterEndpoint] unsub peer streams {}, key {}", peer_id, self.peer_key(&peer_id));
183                    self.kv_sdk.hunsubscribe_raw(self.peer_key(&peer_id), self.sub_uuid);
184                } else {
185                    log::warn!("[Atm0sClusterEndpoint] unsub peer but not found {peer_id}");
186                }
187                Ok(())
188            }
189            ClusterEndpointOutgoingEvent::LocalTrackEvent(track_id, event) => match event {
190                ClusterLocalTrackOutgoingEvent::RequestKeyFrame(kind) => {
191                    let value = match kind {
192                        RequestKeyframeKind::Fir => 1,
193                        RequestKeyframeKind::Pli => 2,
194                    } as i64;
195                    if let Some(consumers) = self.track_sub_map.get(&track_id) {
196                        for (_, consumer) in consumers {
197                            log::debug!("[Atm0sClusterEndpoint] send track feedback RequestKeyFrame {track_id} => {:?}", consumer.uuid());
198                            consumer.feedback(
199                                TrackFeedbackType::RequestKeyFrame as u8,
200                                FeedbackType::Number {
201                                    window_ms: 200,
202                                    info: NumberInfo {
203                                        count: 1,
204                                        sum: value,
205                                        max: value,
206                                        min: value,
207                                    },
208                                },
209                            )
210                        }
211                    } else {
212                        log::warn!("[Atm0sClusterEndpoint] send track feedback RequestKeyFrame but track not found {track_id}");
213                    }
214                    Ok(())
215                }
216                ClusterLocalTrackOutgoingEvent::LimitBitrate(bitrate) => {
217                    if let Some(consumers) = self.track_sub_map.get(&track_id) {
218                        for (_, consumer) in consumers {
219                            log::debug!("[Atm0sClusterEndpoint] send track feedback LimitBitrate({bitrate}) {track_id} => {:?}", consumer.uuid());
220                            consumer.feedback(
221                                TrackFeedbackType::LimitBitrate as u8,
222                                FeedbackType::Number {
223                                    window_ms: 200,
224                                    info: NumberInfo {
225                                        count: 1,
226                                        sum: bitrate as i64,
227                                        max: bitrate as i64,
228                                        min: bitrate as i64,
229                                    },
230                                },
231                            )
232                        }
233                    } else {
234                        log::warn!("[Atm0sClusterEndpoint] send track feedback LimitBitrate({bitrate}) but track not found {track_id}");
235                    }
236                    Ok(())
237                }
238                ClusterLocalTrackOutgoingEvent::Subscribe(peer_id, track_name) => {
239                    let track_uuid = ClusterTrackUuid::from_info(&self.room_id, &peer_id, &track_name);
240                    let consumer = self.pubsub_sdk.create_consumer_raw(*track_uuid as ChannelUuid, self.data_tx.clone());
241                    log::info!("[Atm0sClusterEndpoint] sub track {peer_id} {track_name} => track_uuid {} consumer_id {}", *track_uuid, consumer.uuid());
242                    self.consumer_map.insert(consumer.uuid(), track_id);
243                    let entry = self.track_sub_map.entry(track_id).or_insert_with(Default::default);
244                    entry.insert(track_uuid, consumer);
245                    Ok(())
246                }
247                ClusterLocalTrackOutgoingEvent::Unsubscribe(peer_id, track_name) => {
248                    let track_uuid = ClusterTrackUuid::from_info(&self.room_id, &peer_id, &track_name);
249                    if let Some(consumers) = self.track_sub_map.get_mut(&track_id) {
250                        if let Some(consumer) = consumers.remove(&track_uuid) {
251                            log::info!(
252                                "[Atm0sClusterEndpoint] unsub track {peer_id} {track_name} => track_uuid {} consumer_id {}",
253                                *track_uuid,
254                                consumer.uuid()
255                            );
256                            self.consumer_map.remove(&consumer.uuid());
257                            if consumers.is_empty() {
258                                self.track_sub_map.remove(&track_id);
259                            }
260                        } else {
261                            log::warn!("[Atm0sClusterEndpoint] unsub track but not found {peer_id} {track_name} => track_uuid {}", *track_uuid);
262                        }
263                    } else {
264                        log::warn!("[Atm0sClusterEndpoint] unsub track but not found {peer_id} {track_name} => track_uuid {}", *track_uuid);
265                    }
266                    Ok(())
267                }
268            },
269            ClusterEndpointOutgoingEvent::RemoteTrackEvent(track_id, track_uuid, event) => {
270                let channel_uuid = *track_uuid;
271                match event {
272                    ClusterRemoteTrackOutgoingEvent::TrackAdded(track_name, track_meta) => {
273                        if !self.track_pub.contains_key(&channel_uuid) {
274                            let (sub_key, value) = to_stream_value(&self.peer_id, &track_name, track_meta);
275                            self.track_pub
276                                .insert(channel_uuid, (track_id, self.pubsub_sdk.create_publisher_raw(channel_uuid, self.data_fb_tx.clone())));
277
278                            //set in room hashmap
279                            self.kv_sdk.hset(self.room_streams_map_key, sub_key, value.clone(), Some(10000));
280                            //set in peer hashmap
281                            self.kv_sdk.hset(self.peer_key(&self.peer_id), sub_key, value, Some(10000));
282                            log::info!(
283                                "[Atm0sClusterEndpoint] add track {} {track_name} => track_uuid {channel_uuid} track_id {track_id}, set sub-key in hash {} and {}",
284                                self.peer_id,
285                                self.room_streams_map_key,
286                                self.peer_key(&self.peer_id)
287                            );
288                        } else {
289                            log::warn!(
290                                "[Atm0sClusterEndpoint] add track but already exist {} {track_name} => track_uuid {channel_uuid} track_id {track_id}",
291                                self.peer_id
292                            );
293                        }
294                        Ok(())
295                    }
296                    ClusterRemoteTrackOutgoingEvent::TrackMedia(media_packet) => {
297                        if let Some((_, publisher)) = self.track_pub.get(&channel_uuid) {
298                            if let Ok(buf) = TrackData::Media(media_packet).try_into() {
299                                publisher.send(buf);
300                            }
301                        } else {
302                            log::warn!("[Atm0sClusterEndpoint] send track media but track not found {}", channel_uuid);
303                        }
304                        Ok(())
305                    }
306                    ClusterRemoteTrackOutgoingEvent::TrackStats(stats) => {
307                        if let Some((_, publisher)) = self.track_pub.get(&channel_uuid) {
308                            if let Ok(buf) = TrackData::Stats(stats).try_into() {
309                                publisher.send(buf);
310                            }
311                        } else {
312                            log::warn!("[Atm0sClusterEndpoint] send track stats but track not found {}", channel_uuid);
313                        }
314                        Ok(())
315                    }
316                    ClusterRemoteTrackOutgoingEvent::TrackRemoved(track_name) => {
317                        if self.track_pub.remove(&channel_uuid).is_some() {
318                            let sub_key = to_stream_sub_key(&self.peer_id, &track_name);
319
320                            //del in room hashmap
321                            self.kv_sdk.hdel(self.room_streams_map_key, sub_key);
322
323                            //del in peer hashmap
324                            self.kv_sdk.hdel(self.peer_key(&self.peer_id), sub_key);
325                            log::info!(
326                                "[Atm0sClusterEndpoint] delete track {} {track_name} => track_uuid {channel_uuid} track_id {track_id}, del sub-key in hash {} and {}",
327                                self.peer_id,
328                                self.room_streams_map_key,
329                                self.peer_key(&self.peer_id)
330                            );
331                        } else {
332                            log::warn!(
333                                "[Atm0sClusterEndpoint] delete track but not found {} {track_name} => track_uuid {channel_uuid} track_id {track_id}",
334                                self.peer_id
335                            );
336                        }
337                        Ok(())
338                    }
339                }
340            }
341            ClusterEndpointOutgoingEvent::MediaEndpointLog(event) => {
342                log::info!("[Atm0sClusterEndpoint] log event {:?}", event);
343                let emitter = self.rpc_emitter.clone();
344                async_std::task::spawn_local(async move {
345                    emitter
346                        .request::<_, MediaEndpointLogResponse>(CONNECTOR_SERVICE, RouteRule::ToService(0), RPC_MEDIA_ENDPOINT_LOG, event, 5000)
347                        .await
348                        .log_error("Should send event to Connector Service");
349                });
350                Ok(())
351            }
352        }
353    }
354
355    async fn recv(&mut self) -> Result<ClusterEndpointIncomingEvent, ClusterEndpointError> {
356        loop {
357            select! {
358                event = self.kv_peers_rx.recv().fuse() => match event {
359                    Ok((_key, sub_key, value, _ver, _source)) => {
360                        if sub_key == to_peer_sub_key(&self.peer_id) { //update myself => don't care
361                            continue;
362                        }
363                        if let Some(value) = value { //add or update
364                            if let Some((peer, meta)) = from_peer_value(sub_key, &value) {
365                                if self.remote_peer_cached.insert(sub_key, peer.clone()).is_some() {
366                                    log::info!("[Atm0sClusterEndpoint] on room update peer {}", peer);
367                                    return Ok(ClusterEndpointIncomingEvent::PeerUpdated(peer, meta));
368                                } else {
369                                    log::info!("[Atm0sClusterEndpoint] on room add peer {}", peer);
370                                    return Ok(ClusterEndpointIncomingEvent::PeerAdded(peer, meta));
371                                }
372                            }
373                        } else { //delete
374                            if let Some(peer) = self.remote_peer_cached.remove(&sub_key) {
375                                log::info!("[Atm0sClusterEndpoint] on room remove peer {}", peer);
376                                return Ok(ClusterEndpointIncomingEvent::PeerRemoved(peer));
377                            }
378                        }
379                    }
380                    Err(_e) => {
381                        return Err(ClusterEndpointError::InternalError);
382                    }
383                },
384                event = self.kv_streams_rx.recv().fuse() => match event {
385                    Ok((_key, sub_key, value, _ver, _source)) => {
386                        if let Some(value) = value { //add or update
387                            if let Some((peer, track, meta)) = from_stream_value(sub_key, &value) {
388                                if self.remote_track_cached.insert(sub_key, (peer.clone(), track.clone())).is_some() {
389                                    log::info!("[Atm0sClusterEndpoint] on room update stream {} {}", peer, track);
390                                    return Ok(ClusterEndpointIncomingEvent::PeerTrackUpdated(peer, track, meta));
391                                } else {
392                                    log::info!("[Atm0sClusterEndpoint] on room add stream {} {}", peer, track);
393                                    return Ok(ClusterEndpointIncomingEvent::PeerTrackAdded(peer, track, meta));
394                                }
395                            }
396                        } else { //delete
397                            if let Some((peer, track)) = self.remote_track_cached.remove(&sub_key) {
398                                log::info!("[Atm0sClusterEndpoint] on room remove {} {}", peer, track);
399                                return Ok(ClusterEndpointIncomingEvent::PeerTrackRemoved(peer, track));
400                            }
401                        }
402                    }
403                    Err(_e) => {
404                        return Err(ClusterEndpointError::InternalError);
405                    }
406                },
407                event = self.data_fb_rx.recv().fuse() => match event {
408                    Ok(fb) => {
409                        if let Some((track_id, _)) = self.track_pub.get(&fb.channel.uuid()) {
410                            log::trace!("[Atm0sClusterEndpoint] recv track feedback {track_id} => {:?}", fb);
411                            match (TrackFeedbackType::try_from(fb.id), fb.feedback_type) {
412                                (Ok(TrackFeedbackType::LimitBitrate), FeedbackType::Number { window_ms: _, info }) => {
413                                    return Ok(ClusterEndpointIncomingEvent::RemoteTrackEvent(*track_id, ClusterRemoteTrackIncomingEvent::RequestLimitBitrate(info.max as u32)));
414                                },
415                                (Ok(TrackFeedbackType::RequestKeyFrame), FeedbackType::Number { window_ms: _, info }) => {
416                                    let kind = if info.sum > info.count as i64 { //mean has more than 1 has type2 => Pli
417                                        transport::RequestKeyframeKind::Pli
418                                    } else {
419                                        transport::RequestKeyframeKind::Fir
420                                    };
421                                    return Ok(ClusterEndpointIncomingEvent::RemoteTrackEvent(*track_id, ClusterRemoteTrackIncomingEvent::RequestKeyFrame(kind)));
422                                },
423                                _ => {}
424                            }
425                        } else {
426                            log::warn!("[Atm0sClusterEndpoint] recv track feedback but track not found {}", fb.channel.uuid());
427                        }
428                    },
429                    Err(_e) => {
430                        return Err(ClusterEndpointError::InternalError);
431                    }
432                },
433                event = self.data_rx.recv().fuse() => match event {
434                    Ok((sub_id, _node_id, channel_uuid, data)) => {
435                        if let Some(track_id) = self.consumer_map.get(&sub_id) {
436                            log::trace!("[Atm0sClusterEndpoint] recv track data {sub_id} => {track_id}");
437                            match TrackData::try_from(data) {
438                                Ok(TrackData::Media(media_packet)) => {
439                                    return Ok(ClusterEndpointIncomingEvent::LocalTrackEvent(*track_id, ClusterLocalTrackIncomingEvent::MediaPacket(channel_uuid.into(), media_packet)));
440                                },
441                                Ok(TrackData::Stats(stats)) => {
442                                    return Ok(ClusterEndpointIncomingEvent::LocalTrackEvent(*track_id, ClusterLocalTrackIncomingEvent::MediaStats(channel_uuid.into(), stats)));
443                                },
444                                Err(_e) => {
445
446                                }
447                            }
448                        } else {
449                            log::warn!("[Atm0sClusterEndpoint] recv track data but track not found {}", sub_id);
450                        }
451                    },
452                    Err(_e) => {
453                        return Err(ClusterEndpointError::InternalError);
454                    }
455                }
456            }
457        }
458    }
459}
460
461impl Drop for ClusterEndpointSdn {
462    fn drop(&mut self) {
463        assert_eq!(self.consumer_map.len(), 0);
464        assert_eq!(self.track_sub_map.len(), 0);
465        assert_eq!(self.peer_sub.len(), 0);
466        assert_eq!(self.track_pub.len(), 0);
467        assert_eq!(self.room_streams_sub, None);
468    }
469}
470
471//TODO test this