rtmp_rs/registry/
entry.rs1use std::sync::atomic::{AtomicU32, Ordering};
6use std::time::Instant;
7
8use tokio::sync::broadcast;
9
10use crate::media::flv::FlvTag;
11use crate::media::gop::GopBuffer;
12
13use super::config::RegistryConfig;
14use super::frame::BroadcastFrame;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum StreamState {
19 Active,
21 GracePeriod,
23 Idle,
25}
26
27pub struct StreamEntry {
29 pub gop_buffer: GopBuffer,
31
32 pub video_header: Option<BroadcastFrame>,
34
35 pub audio_header: Option<BroadcastFrame>,
37
38 pub metadata: Option<BroadcastFrame>,
40
41 pub publisher_id: Option<u64>,
43
44 pub(super) tx: broadcast::Sender<BroadcastFrame>,
46
47 pub subscriber_count: AtomicU32,
49
50 pub publisher_disconnected_at: Option<Instant>,
52
53 pub created_at: Instant,
55
56 pub state: StreamState,
58}
59
60impl StreamEntry {
61 pub(super) fn new(config: &RegistryConfig) -> Self {
63 let (tx, _) = broadcast::channel(config.broadcast_capacity);
64
65 Self {
66 gop_buffer: GopBuffer::with_max_size(config.max_gop_size),
67 video_header: None,
68 audio_header: None,
69 metadata: None,
70 publisher_id: None,
71 tx,
72 subscriber_count: AtomicU32::new(0),
73 publisher_disconnected_at: None,
74 created_at: Instant::now(),
75 state: StreamState::Idle,
76 }
77 }
78
79 pub fn subscriber_count(&self) -> u32 {
81 self.subscriber_count.load(Ordering::Relaxed)
82 }
83
84 pub fn has_publisher(&self) -> bool {
86 self.publisher_id.is_some()
87 }
88
89 pub fn get_catchup_frames(&self) -> Vec<BroadcastFrame> {
93 let mut frames = Vec::new();
94
95 if let Some(ref meta) = self.metadata {
97 frames.push(meta.clone());
98 }
99
100 if let Some(ref video) = self.video_header {
102 frames.push(video.clone());
103 }
104 if let Some(ref audio) = self.audio_header {
105 frames.push(audio.clone());
106 }
107
108 for tag in self.gop_buffer.get_catchup_data() {
110 frames.push(BroadcastFrame::from_flv_tag(&tag));
111 }
112
113 frames
114 }
115
116 pub(super) fn subscribe(&self) -> broadcast::Receiver<BroadcastFrame> {
118 self.tx.subscribe()
119 }
120
121 pub(super) fn send(&self, frame: BroadcastFrame) -> usize {
125 self.tx.send(frame).unwrap_or(0)
126 }
127
128 pub(super) fn update_caches(&mut self, frame: &BroadcastFrame) {
130 use super::frame::FrameType;
131
132 match frame.frame_type {
133 FrameType::Video if frame.is_header => {
134 self.video_header = Some(frame.clone());
135 }
136 FrameType::Audio if frame.is_header => {
137 self.audio_header = Some(frame.clone());
138 }
139 FrameType::Metadata => {
140 self.metadata = Some(frame.clone());
141 }
142 _ => {}
143 }
144
145 if frame.frame_type == FrameType::Video && !frame.is_header {
147 let tag = FlvTag::video(frame.timestamp, frame.data.clone());
148 self.gop_buffer.push(tag);
149 }
150 }
151}
152
153#[derive(Debug, Clone)]
155pub struct StreamStats {
156 pub subscriber_count: u32,
158 pub has_publisher: bool,
160 pub state: StreamState,
162 pub gop_frame_count: usize,
164 pub gop_size_bytes: usize,
166}