streamhub/
define.rs

1use chrono::{DateTime, Local};
2use serde::Deserialize;
3use serde_json::Value;
4use xflv::define::{AacProfile, AvcCodecId, AvcLevel, AvcProfile, SoundFormat};
5
6use crate::utils;
7
8use {
9    super::errors::StreamHubError,
10    crate::statistics::StatisticsStream,
11    crate::stream::StreamIdentifier,
12    async_trait::async_trait,
13    bytes::BytesMut,
14    serde::ser::SerializeStruct,
15    serde::Serialize,
16    serde::Serializer,
17    std::fmt,
18    std::sync::Arc,
19    tokio::sync::{broadcast, mpsc, oneshot},
20    utils::Uuid,
21};
22
23/* Subscribe streams from stream hub */
24#[derive(Debug, Serialize, Clone, Eq, PartialEq)]
25pub enum SubscribeType {
26    /* Remote client request pulling(play) a rtmp stream.*/
27    RtmpPull,
28    /* Remote request to play httpflv triggers remux from RTMP to httpflv. */
29    RtmpRemux2HttpFlv,
30    /* The publishing of RTMP stream triggers remuxing from RTMP to HLS protocol.(NOTICE:It is not triggerred by players.)*/
31    RtmpRemux2Hls,
32    /* Relay(Push) local RTMP stream from stream hub to other RTMP nodes.*/
33    RtmpRelay,
34    /* Remote client request pulling(play) a rtsp stream.*/
35    RtspPull,
36    /* The publishing of RTSP stream triggers remuxing from RTSP to RTMP protocol.*/
37    RtspRemux2Rtmp,
38    /* Relay(Push) local RTSP stream to other RTSP nodes.*/
39    RtspRelay,
40    /* Remote client request pulling(play) stream through whep.*/
41    WhepPull,
42    /* Remuxing webrtc stream to RTMP */
43    WebRTCRemux2Rtmp,
44    /* Relay(Push) the local webRTC stream to other nodes using Whip.*/
45    WhipRelay,
46    /* Pull rtp stream by subscribing from stream hub.*/
47    RtpPull,
48}
49
50/* Publish streams to stream hub */
51#[derive(Debug, Serialize, Clone, Eq, PartialEq)]
52pub enum PublishType {
53    /* Receive rtmp stream from remote push client. */
54    RtmpPush,
55    /* Relay(Pull) remote RTMP stream to local stream hub. */
56    RtmpRelay,
57    /* Receive rtsp stream from remote push client */
58    RtspPush,
59    /* Relay(Pull) remote RTSP stream to local stream hub. */
60    RtspRelay,
61    /* Receive whip stream from remote push client. */
62    WhipPush,
63    /* Relay(Pull) remote WebRTC stream to local stream hub using Whep. */
64    WhepRelay,
65    /* It used for publishing raw rtp data of rtsp/whbrtc(whip) */
66    RtpPush,
67}
68
69#[derive(Debug, Serialize, Clone)]
70pub struct NotifyInfo {
71    pub request_url: String,
72    pub remote_addr: String,
73}
74
75#[derive(Debug, Clone)]
76pub struct SubscriberInfo {
77    pub id: Uuid,
78    pub sub_type: SubscribeType,
79    pub notify_info: NotifyInfo,
80    pub sub_data_type: SubDataType,
81}
82
83impl Serialize for SubscriberInfo {
84    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
85    where
86        S: Serializer,
87    {
88        // 3 is the number of fields in the struct.
89        let mut state = serializer.serialize_struct("SubscriberInfo", 3)?;
90
91        state.serialize_field("id", &self.id.to_string())?;
92        state.serialize_field("sub_type", &self.sub_type)?;
93        state.serialize_field("notify_info", &self.notify_info)?;
94        state.end()
95    }
96}
97
98#[derive(Debug, Clone)]
99pub struct PublisherInfo {
100    pub id: Uuid,
101    pub pub_type: PublishType,
102    pub pub_data_type: PubDataType,
103    pub notify_info: NotifyInfo,
104}
105
106impl Serialize for PublisherInfo {
107    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
108    where
109        S: Serializer,
110    {
111        // 3 is the number of fields in the struct.
112        let mut state = serializer.serialize_struct("PublisherInfo", 3)?;
113
114        state.serialize_field("id", &self.id.to_string())?;
115        state.serialize_field("pub_type", &self.pub_type)?;
116        state.serialize_field("notify_info", &self.notify_info)?;
117        state.end()
118    }
119}
120
121#[derive(Clone, PartialEq)]
122pub enum VideoCodecType {
123    H264,
124    H265,
125}
126
127#[derive(Clone)]
128pub struct MediaInfo {
129    pub audio_clock_rate: u32,
130    pub video_clock_rate: u32,
131    pub vcodec: VideoCodecType,
132}
133
134#[derive(Clone)]
135pub enum FrameData {
136    Video { timestamp: u32, data: BytesMut },
137    Audio { timestamp: u32, data: BytesMut },
138    MetaData { timestamp: u32, data: BytesMut },
139    MediaInfo { media_info: MediaInfo },
140}
141
142//Used to pass rtp raw data.
143#[derive(Clone)]
144pub enum PacketData {
145    Video { timestamp: u32, data: BytesMut },
146    Audio { timestamp: u32, data: BytesMut },
147}
148
149//used to save data which needs to be transferred between client/server sessions
150#[derive(Clone)]
151pub enum Information {
152    Sdp { data: String },
153}
154
155//used to transfer a/v frame between different protocols(rtmp/rtsp/webrtc/http-flv/hls)
156//or send a/v frame data from publisher to subscribers.
157pub type FrameDataSender = mpsc::UnboundedSender<FrameData>;
158pub type FrameDataReceiver = mpsc::UnboundedReceiver<FrameData>;
159
160//used to transfer rtp packet data,it includles the following directions:
161// rtsp(publisher)->stream hub->rtsp(subscriber)
162// webrtc(publisher whip)->stream hub->webrtc(subscriber whep)
163pub type PacketDataSender = mpsc::UnboundedSender<PacketData>;
164pub type PacketDataReceiver = mpsc::UnboundedReceiver<PacketData>;
165
166pub type InformationSender = mpsc::UnboundedSender<Information>;
167pub type InformationReceiver = mpsc::UnboundedReceiver<Information>;
168
169pub type StreamHubEventSender = mpsc::UnboundedSender<StreamHubEvent>;
170pub type StreamHubEventReceiver = mpsc::UnboundedReceiver<StreamHubEvent>;
171
172pub type BroadcastEventSender = broadcast::Sender<BroadcastEvent>;
173pub type BroadcastEventReceiver = broadcast::Receiver<BroadcastEvent>;
174
175pub type TransceiverEventSender = mpsc::UnboundedSender<TransceiverEvent>;
176pub type TransceiverEventReceiver = mpsc::UnboundedReceiver<TransceiverEvent>;
177
178pub type StatisticDataSender = mpsc::UnboundedSender<StatisticData>;
179pub type StatisticDataReceiver = mpsc::UnboundedReceiver<StatisticData>;
180
181pub type StatisticStreamSender = mpsc::UnboundedSender<StatisticsStream>;
182pub type StatisticStreamReceiver = mpsc::UnboundedReceiver<StatisticsStream>;
183
184pub type StatisticApiResultSender = oneshot::Sender<Value>;
185pub type StatisticApiResultReceiver = oneshot::Receiver<Value>;
186
187pub type SubEventExecuteResultSender =
188    oneshot::Sender<Result<(DataReceiver, Option<StatisticDataSender>), StreamHubError>>;
189pub type PubEventExecuteResultSender = oneshot::Sender<
190    Result<
191        (
192            Option<FrameDataSender>,
193            Option<PacketDataSender>,
194            Option<StatisticDataSender>,
195        ),
196        StreamHubError,
197    >,
198>;
199// The trait bound `BroadcastEvent: Clone` should be satisfied, so here we cannot use oneshot.
200pub type BroadcastEventExecuteResultSender = mpsc::Sender<Result<(), StreamHubError>>;
201pub type ApiRelayStreamResultSender = oneshot::Sender<Result<(), StreamHubError>>;
202pub type TransceiverEventExecuteResultSender = oneshot::Sender<StatisticDataSender>;
203
204#[async_trait]
205pub trait TStreamHandler: Send + Sync {
206    async fn send_prior_data(
207        &self,
208        sender: DataSender,
209        sub_type: SubscribeType,
210    ) -> Result<(), StreamHubError>;
211    async fn get_statistic_data(&self) -> Option<StatisticsStream>;
212    async fn send_information(&self, sender: InformationSender);
213}
214
215//A publisher can publish one or two kinds of av stream at a time.
216pub struct DataReceiver {
217    pub frame_receiver: Option<FrameDataReceiver>,
218    pub packet_receiver: Option<PacketDataReceiver>,
219}
220
221//A subscriber only needs to subscribe to one type of stream at a time
222#[derive(Debug, Clone)]
223pub enum DataSender {
224    Frame { sender: FrameDataSender },
225    Packet { sender: PacketDataSender },
226}
227//we can only sub one kind of stream.
228#[derive(Debug, Clone, Serialize)]
229pub enum SubDataType {
230    Frame,
231    Packet,
232}
233//we can pub frame or packet or both.
234#[derive(Debug, Clone, Serialize)]
235pub enum PubDataType {
236    Frame,
237    Packet,
238    Both,
239}
240
241#[derive(Clone, Serialize, Debug)]
242pub enum StreamHubEventMessage {
243    Subscribe {
244        identifier: StreamIdentifier,
245        info: SubscriberInfo,
246    },
247    UnSubscribe {
248        identifier: StreamIdentifier,
249        info: SubscriberInfo,
250    },
251    Publish {
252        identifier: StreamIdentifier,
253        info: PublisherInfo,
254    },
255    UnPublish {
256        identifier: StreamIdentifier,
257        info: PublisherInfo,
258    },
259    NotSupport {},
260}
261
262//we can pub frame or packet or both.
263#[derive(Debug, Clone, Serialize, Deserialize)]
264pub enum RelayType {
265    Pull,
266    Push,
267}
268
269#[derive(Serialize)]
270pub enum StreamHubEvent {
271    Subscribe {
272        identifier: StreamIdentifier,
273        info: SubscriberInfo,
274        #[serde(skip_serializing)]
275        result_sender: SubEventExecuteResultSender,
276    },
277    UnSubscribe {
278        identifier: StreamIdentifier,
279        info: SubscriberInfo,
280    },
281    Publish {
282        identifier: StreamIdentifier,
283        info: PublisherInfo,
284        #[serde(skip_serializing)]
285        result_sender: PubEventExecuteResultSender,
286        #[serde(skip_serializing)]
287        stream_handler: Arc<dyn TStreamHandler>,
288    },
289    UnPublish {
290        identifier: StreamIdentifier,
291        info: PublisherInfo,
292    },
293    #[serde(skip_serializing)]
294    ApiStatistic {
295        top_n: Option<usize>,
296        identifier: Option<StreamIdentifier>,
297        uuid: Option<Uuid>,
298        result_sender: StatisticApiResultSender,
299    },
300    #[serde(skip_serializing)]
301    ApiKickClient { id: Uuid },
302    #[serde(skip_serializing)]
303    ApiStartRelayStream {
304        id: String,
305        identifier: StreamIdentifier,
306        server_address: String,
307        relay_type: RelayType,
308        result_sender: ApiRelayStreamResultSender,
309    },
310    #[serde(skip_serializing)]
311    ApiStopRelayStream {
312        id: String,
313        relay_type: RelayType,
314        result_sender: ApiRelayStreamResultSender,
315    },
316    #[serde(skip_serializing)]
317    Request {
318        identifier: StreamIdentifier,
319        sender: InformationSender,
320    },
321}
322
323impl StreamHubEvent {
324    pub fn to_message(&self) -> StreamHubEventMessage {
325        match self {
326            StreamHubEvent::Subscribe {
327                identifier,
328                info,
329                result_sender: _result_sender,
330            } => StreamHubEventMessage::Subscribe {
331                identifier: identifier.clone(),
332                info: info.clone(),
333            },
334            StreamHubEvent::UnSubscribe { identifier, info } => {
335                StreamHubEventMessage::UnSubscribe {
336                    identifier: identifier.clone(),
337                    info: info.clone(),
338                }
339            }
340            StreamHubEvent::Publish {
341                identifier,
342                info,
343                result_sender: _result_sender,
344                stream_handler: _stream_handler,
345            } => StreamHubEventMessage::Publish {
346                identifier: identifier.clone(),
347                info: info.clone(),
348            },
349            StreamHubEvent::UnPublish { identifier, info } => StreamHubEventMessage::UnPublish {
350                identifier: identifier.clone(),
351                info: info.clone(),
352            },
353            _ => StreamHubEventMessage::NotSupport {},
354        }
355    }
356}
357
358#[derive(Debug)]
359pub enum TransceiverEvent {
360    Subscribe {
361        sender: DataSender,
362        info: SubscriberInfo,
363        result_sender: TransceiverEventExecuteResultSender,
364    },
365    UnSubscribe {
366        info: SubscriberInfo,
367    },
368    UnPublish {},
369
370    Api {
371        sender: StatisticStreamSender,
372        uuid: Option<Uuid>,
373    },
374    Request {
375        sender: InformationSender,
376    },
377}
378
379impl fmt::Display for TransceiverEvent {
380    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
381        write!(f, "{:?}", *self)
382    }
383}
384
385#[derive(Debug, Clone)]
386pub enum BroadcastEvent {
387    /*Need publish(push) a stream to other rtmp server*/
388    Publish {
389        identifier: StreamIdentifier,
390    },
391    UnPublish {
392        identifier: StreamIdentifier,
393    },
394    /*Need subscribe(pull) a stream from other rtmp server*/
395    Subscribe {
396        id: String,
397        identifier: StreamIdentifier,
398        server_address: Option<String>,
399        result_sender: Option<BroadcastEventExecuteResultSender>,
400    },
401    UnSubscribe {
402        id: String,
403        result_sender: Option<BroadcastEventExecuteResultSender>,
404        //identifier: StreamIdentifier,
405        //server_address: Option<String>,
406    },
407}
408
409pub enum StatisticData {
410    AudioCodec {
411        sound_format: SoundFormat,
412        profile: AacProfile,
413        samplerate: u32,
414        channels: u8,
415    },
416    VideoCodec {
417        codec: AvcCodecId,
418        profile: AvcProfile,
419        level: AvcLevel,
420        width: u32,
421        height: u32,
422    },
423    Audio {
424        uuid: Option<Uuid>,
425        data_size: usize,
426        aac_packet_type: u8,
427        duration: usize,
428    },
429    Video {
430        uuid: Option<Uuid>,
431        data_size: usize,
432        frame_count: usize,
433        is_key_frame: Option<bool>,
434        duration: usize,
435    },
436    Publisher {
437        id: Uuid,
438        remote_addr: String,
439        start_time: DateTime<Local>,
440    },
441    Subscriber {
442        id: Uuid,
443        remote_addr: String,
444        sub_type: SubscribeType,
445        start_time: DateTime<Local>,
446    },
447}