1use chrono::{DateTime, Local};
2use serde::Deserialize;
3use serde_json::Value;
4use xflv::define::{AacProfile, AvcCodecId, AvcLevel, AvcProfile, SoundFormat};
5
6use crate::utils;
7
8use {
9 super::errors::StreamHubError,
10 crate::statistics::StatisticsStream,
11 crate::stream::StreamIdentifier,
12 async_trait::async_trait,
13 bytes::BytesMut,
14 serde::ser::SerializeStruct,
15 serde::Serialize,
16 serde::Serializer,
17 std::fmt,
18 std::sync::Arc,
19 tokio::sync::{broadcast, mpsc, oneshot},
20 utils::Uuid,
21};
22
23#[derive(Debug, Serialize, Clone, Eq, PartialEq)]
25pub enum SubscribeType {
26 RtmpPull,
28 RtmpRemux2HttpFlv,
30 RtmpRemux2Hls,
32 RtmpRelay,
34 RtspPull,
36 RtspRemux2Rtmp,
38 RtspRelay,
40 WhepPull,
42 WebRTCRemux2Rtmp,
44 WhipRelay,
46 RtpPull,
48}
49
50#[derive(Debug, Serialize, Clone, Eq, PartialEq)]
52pub enum PublishType {
53 RtmpPush,
55 RtmpRelay,
57 RtspPush,
59 RtspRelay,
61 WhipPush,
63 WhepRelay,
65 RtpPush,
67}
68
69#[derive(Debug, Serialize, Clone)]
70pub struct NotifyInfo {
71 pub request_url: String,
72 pub remote_addr: String,
73}
74
75#[derive(Debug, Clone)]
76pub struct SubscriberInfo {
77 pub id: Uuid,
78 pub sub_type: SubscribeType,
79 pub notify_info: NotifyInfo,
80 pub sub_data_type: SubDataType,
81}
82
83impl Serialize for SubscriberInfo {
84 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
85 where
86 S: Serializer,
87 {
88 let mut state = serializer.serialize_struct("SubscriberInfo", 3)?;
90
91 state.serialize_field("id", &self.id.to_string())?;
92 state.serialize_field("sub_type", &self.sub_type)?;
93 state.serialize_field("notify_info", &self.notify_info)?;
94 state.end()
95 }
96}
97
98#[derive(Debug, Clone)]
99pub struct PublisherInfo {
100 pub id: Uuid,
101 pub pub_type: PublishType,
102 pub pub_data_type: PubDataType,
103 pub notify_info: NotifyInfo,
104}
105
106impl Serialize for PublisherInfo {
107 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
108 where
109 S: Serializer,
110 {
111 let mut state = serializer.serialize_struct("PublisherInfo", 3)?;
113
114 state.serialize_field("id", &self.id.to_string())?;
115 state.serialize_field("pub_type", &self.pub_type)?;
116 state.serialize_field("notify_info", &self.notify_info)?;
117 state.end()
118 }
119}
120
121#[derive(Clone, PartialEq)]
122pub enum VideoCodecType {
123 H264,
124 H265,
125}
126
127#[derive(Clone)]
128pub struct MediaInfo {
129 pub audio_clock_rate: u32,
130 pub video_clock_rate: u32,
131 pub vcodec: VideoCodecType,
132}
133
134#[derive(Clone)]
135pub enum FrameData {
136 Video { timestamp: u32, data: BytesMut },
137 Audio { timestamp: u32, data: BytesMut },
138 MetaData { timestamp: u32, data: BytesMut },
139 MediaInfo { media_info: MediaInfo },
140}
141
142#[derive(Clone)]
144pub enum PacketData {
145 Video { timestamp: u32, data: BytesMut },
146 Audio { timestamp: u32, data: BytesMut },
147}
148
149#[derive(Clone)]
151pub enum Information {
152 Sdp { data: String },
153}
154
155pub type FrameDataSender = mpsc::UnboundedSender<FrameData>;
158pub type FrameDataReceiver = mpsc::UnboundedReceiver<FrameData>;
159
160pub type PacketDataSender = mpsc::UnboundedSender<PacketData>;
164pub type PacketDataReceiver = mpsc::UnboundedReceiver<PacketData>;
165
166pub type InformationSender = mpsc::UnboundedSender<Information>;
167pub type InformationReceiver = mpsc::UnboundedReceiver<Information>;
168
169pub type StreamHubEventSender = mpsc::UnboundedSender<StreamHubEvent>;
170pub type StreamHubEventReceiver = mpsc::UnboundedReceiver<StreamHubEvent>;
171
172pub type BroadcastEventSender = broadcast::Sender<BroadcastEvent>;
173pub type BroadcastEventReceiver = broadcast::Receiver<BroadcastEvent>;
174
175pub type TransceiverEventSender = mpsc::UnboundedSender<TransceiverEvent>;
176pub type TransceiverEventReceiver = mpsc::UnboundedReceiver<TransceiverEvent>;
177
178pub type StatisticDataSender = mpsc::UnboundedSender<StatisticData>;
179pub type StatisticDataReceiver = mpsc::UnboundedReceiver<StatisticData>;
180
181pub type StatisticStreamSender = mpsc::UnboundedSender<StatisticsStream>;
182pub type StatisticStreamReceiver = mpsc::UnboundedReceiver<StatisticsStream>;
183
184pub type StatisticApiResultSender = oneshot::Sender<Value>;
185pub type StatisticApiResultReceiver = oneshot::Receiver<Value>;
186
187pub type SubEventExecuteResultSender =
188 oneshot::Sender<Result<(DataReceiver, Option<StatisticDataSender>), StreamHubError>>;
189pub type PubEventExecuteResultSender = oneshot::Sender<
190 Result<
191 (
192 Option<FrameDataSender>,
193 Option<PacketDataSender>,
194 Option<StatisticDataSender>,
195 ),
196 StreamHubError,
197 >,
198>;
199pub type BroadcastEventExecuteResultSender = mpsc::Sender<Result<(), StreamHubError>>;
201pub type ApiRelayStreamResultSender = oneshot::Sender<Result<(), StreamHubError>>;
202pub type TransceiverEventExecuteResultSender = oneshot::Sender<StatisticDataSender>;
203
204#[async_trait]
205pub trait TStreamHandler: Send + Sync {
206 async fn send_prior_data(
207 &self,
208 sender: DataSender,
209 sub_type: SubscribeType,
210 ) -> Result<(), StreamHubError>;
211 async fn get_statistic_data(&self) -> Option<StatisticsStream>;
212 async fn send_information(&self, sender: InformationSender);
213}
214
215pub struct DataReceiver {
217 pub frame_receiver: Option<FrameDataReceiver>,
218 pub packet_receiver: Option<PacketDataReceiver>,
219}
220
221#[derive(Debug, Clone)]
223pub enum DataSender {
224 Frame { sender: FrameDataSender },
225 Packet { sender: PacketDataSender },
226}
227#[derive(Debug, Clone, Serialize)]
229pub enum SubDataType {
230 Frame,
231 Packet,
232}
233#[derive(Debug, Clone, Serialize)]
235pub enum PubDataType {
236 Frame,
237 Packet,
238 Both,
239}
240
241#[derive(Clone, Serialize, Debug)]
242pub enum StreamHubEventMessage {
243 Subscribe {
244 identifier: StreamIdentifier,
245 info: SubscriberInfo,
246 },
247 UnSubscribe {
248 identifier: StreamIdentifier,
249 info: SubscriberInfo,
250 },
251 Publish {
252 identifier: StreamIdentifier,
253 info: PublisherInfo,
254 },
255 UnPublish {
256 identifier: StreamIdentifier,
257 info: PublisherInfo,
258 },
259 NotSupport {},
260}
261
262#[derive(Debug, Clone, Serialize, Deserialize)]
264pub enum RelayType {
265 Pull,
266 Push,
267}
268
269#[derive(Serialize)]
270pub enum StreamHubEvent {
271 Subscribe {
272 identifier: StreamIdentifier,
273 info: SubscriberInfo,
274 #[serde(skip_serializing)]
275 result_sender: SubEventExecuteResultSender,
276 },
277 UnSubscribe {
278 identifier: StreamIdentifier,
279 info: SubscriberInfo,
280 },
281 Publish {
282 identifier: StreamIdentifier,
283 info: PublisherInfo,
284 #[serde(skip_serializing)]
285 result_sender: PubEventExecuteResultSender,
286 #[serde(skip_serializing)]
287 stream_handler: Arc<dyn TStreamHandler>,
288 },
289 UnPublish {
290 identifier: StreamIdentifier,
291 info: PublisherInfo,
292 },
293 #[serde(skip_serializing)]
294 ApiStatistic {
295 top_n: Option<usize>,
296 identifier: Option<StreamIdentifier>,
297 uuid: Option<Uuid>,
298 result_sender: StatisticApiResultSender,
299 },
300 #[serde(skip_serializing)]
301 ApiKickClient { id: Uuid },
302 #[serde(skip_serializing)]
303 ApiStartRelayStream {
304 id: String,
305 identifier: StreamIdentifier,
306 server_address: String,
307 relay_type: RelayType,
308 result_sender: ApiRelayStreamResultSender,
309 },
310 #[serde(skip_serializing)]
311 ApiStopRelayStream {
312 id: String,
313 relay_type: RelayType,
314 result_sender: ApiRelayStreamResultSender,
315 },
316 #[serde(skip_serializing)]
317 Request {
318 identifier: StreamIdentifier,
319 sender: InformationSender,
320 },
321}
322
323impl StreamHubEvent {
324 pub fn to_message(&self) -> StreamHubEventMessage {
325 match self {
326 StreamHubEvent::Subscribe {
327 identifier,
328 info,
329 result_sender: _result_sender,
330 } => StreamHubEventMessage::Subscribe {
331 identifier: identifier.clone(),
332 info: info.clone(),
333 },
334 StreamHubEvent::UnSubscribe { identifier, info } => {
335 StreamHubEventMessage::UnSubscribe {
336 identifier: identifier.clone(),
337 info: info.clone(),
338 }
339 }
340 StreamHubEvent::Publish {
341 identifier,
342 info,
343 result_sender: _result_sender,
344 stream_handler: _stream_handler,
345 } => StreamHubEventMessage::Publish {
346 identifier: identifier.clone(),
347 info: info.clone(),
348 },
349 StreamHubEvent::UnPublish { identifier, info } => StreamHubEventMessage::UnPublish {
350 identifier: identifier.clone(),
351 info: info.clone(),
352 },
353 _ => StreamHubEventMessage::NotSupport {},
354 }
355 }
356}
357
358#[derive(Debug)]
359pub enum TransceiverEvent {
360 Subscribe {
361 sender: DataSender,
362 info: SubscriberInfo,
363 result_sender: TransceiverEventExecuteResultSender,
364 },
365 UnSubscribe {
366 info: SubscriberInfo,
367 },
368 UnPublish {},
369
370 Api {
371 sender: StatisticStreamSender,
372 uuid: Option<Uuid>,
373 },
374 Request {
375 sender: InformationSender,
376 },
377}
378
379impl fmt::Display for TransceiverEvent {
380 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
381 write!(f, "{:?}", *self)
382 }
383}
384
385#[derive(Debug, Clone)]
386pub enum BroadcastEvent {
387 Publish {
389 identifier: StreamIdentifier,
390 },
391 UnPublish {
392 identifier: StreamIdentifier,
393 },
394 Subscribe {
396 id: String,
397 identifier: StreamIdentifier,
398 server_address: Option<String>,
399 result_sender: Option<BroadcastEventExecuteResultSender>,
400 },
401 UnSubscribe {
402 id: String,
403 result_sender: Option<BroadcastEventExecuteResultSender>,
404 },
407}
408
409pub enum StatisticData {
410 AudioCodec {
411 sound_format: SoundFormat,
412 profile: AacProfile,
413 samplerate: u32,
414 channels: u8,
415 },
416 VideoCodec {
417 codec: AvcCodecId,
418 profile: AvcProfile,
419 level: AvcLevel,
420 width: u32,
421 height: u32,
422 },
423 Audio {
424 uuid: Option<Uuid>,
425 data_size: usize,
426 aac_packet_type: u8,
427 duration: usize,
428 },
429 Video {
430 uuid: Option<Uuid>,
431 data_size: usize,
432 frame_count: usize,
433 is_key_frame: Option<bool>,
434 duration: usize,
435 },
436 Publisher {
437 id: Uuid,
438 remote_addr: String,
439 start_time: DateTime<Local>,
440 },
441 Subscriber {
442 id: Uuid,
443 remote_addr: String,
444 sub_type: SubscribeType,
445 start_time: DateTime<Local>,
446 },
447}