rtmp_rs/registry/
entry.rs

1//! Stream entry and state types
2//!
3//! This module defines the per-stream state stored in the registry.
4
5use std::sync::atomic::{AtomicU32, Ordering};
6use std::time::Instant;
7
8use tokio::sync::broadcast;
9
10use crate::media::flv::FlvTag;
11use crate::media::gop::GopBuffer;
12
13use super::config::RegistryConfig;
14use super::frame::BroadcastFrame;
15
16/// State of a stream entry
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum StreamState {
19    /// Stream has an active publisher
20    Active,
21    /// Publisher disconnected, within grace period
22    GracePeriod,
23    /// No publisher, waiting for cleanup
24    Idle,
25}
26
27/// Entry for a single stream in the registry
28pub struct StreamEntry {
29    /// GOP buffer for late-joiner support
30    pub gop_buffer: GopBuffer,
31
32    /// Cached video sequence header for fast subscriber catchup
33    pub video_header: Option<BroadcastFrame>,
34
35    /// Cached audio sequence header for fast subscriber catchup
36    pub audio_header: Option<BroadcastFrame>,
37
38    /// Cached metadata
39    pub metadata: Option<BroadcastFrame>,
40
41    /// Current publisher's session ID (None if no publisher)
42    pub publisher_id: Option<u64>,
43
44    /// Broadcast sender for fan-out to subscribers
45    pub(super) tx: broadcast::Sender<BroadcastFrame>,
46
47    /// Number of active subscribers
48    pub subscriber_count: AtomicU32,
49
50    /// When the publisher disconnected (for grace period tracking)
51    pub publisher_disconnected_at: Option<Instant>,
52
53    /// When the stream was created
54    pub created_at: Instant,
55
56    /// Current stream state
57    pub state: StreamState,
58}
59
60impl StreamEntry {
61    /// Create a new stream entry
62    pub(super) fn new(config: &RegistryConfig) -> Self {
63        let (tx, _) = broadcast::channel(config.broadcast_capacity);
64
65        Self {
66            gop_buffer: GopBuffer::with_max_size(config.max_gop_size),
67            video_header: None,
68            audio_header: None,
69            metadata: None,
70            publisher_id: None,
71            tx,
72            subscriber_count: AtomicU32::new(0),
73            publisher_disconnected_at: None,
74            created_at: Instant::now(),
75            state: StreamState::Idle,
76        }
77    }
78
79    /// Get the number of subscribers
80    pub fn subscriber_count(&self) -> u32 {
81        self.subscriber_count.load(Ordering::Relaxed)
82    }
83
84    /// Check if the stream has an active publisher
85    pub fn has_publisher(&self) -> bool {
86        self.publisher_id.is_some()
87    }
88
89    /// Get catchup frames for a new subscriber
90    ///
91    /// Returns sequence headers followed by GOP buffer contents.
92    pub fn get_catchup_frames(&self) -> Vec<BroadcastFrame> {
93        let mut frames = Vec::new();
94
95        // Add metadata first
96        if let Some(ref meta) = self.metadata {
97            frames.push(meta.clone());
98        }
99
100        // Add sequence headers
101        if let Some(ref video) = self.video_header {
102            frames.push(video.clone());
103        }
104        if let Some(ref audio) = self.audio_header {
105            frames.push(audio.clone());
106        }
107
108        // Add GOP buffer contents
109        for tag in self.gop_buffer.get_catchup_data() {
110            frames.push(BroadcastFrame::from_flv_tag(&tag));
111        }
112
113        frames
114    }
115
116    /// Subscribe to this stream's broadcast channel
117    pub(super) fn subscribe(&self) -> broadcast::Receiver<BroadcastFrame> {
118        self.tx.subscribe()
119    }
120
121    /// Send a frame to all subscribers
122    ///
123    /// Returns the number of receivers that received the message, or 0 if there are no receivers.
124    pub(super) fn send(&self, frame: BroadcastFrame) -> usize {
125        self.tx.send(frame).unwrap_or(0)
126    }
127
128    /// Update cached headers and GOP buffer based on frame type
129    pub(super) fn update_caches(&mut self, frame: &BroadcastFrame) {
130        use super::frame::FrameType;
131
132        match frame.frame_type {
133            FrameType::Video if frame.is_header => {
134                self.video_header = Some(frame.clone());
135            }
136            FrameType::Audio if frame.is_header => {
137                self.audio_header = Some(frame.clone());
138            }
139            FrameType::Metadata => {
140                self.metadata = Some(frame.clone());
141            }
142            _ => {}
143        }
144
145        // Update GOP buffer for video frames (non-headers)
146        if frame.frame_type == FrameType::Video && !frame.is_header {
147            let tag = FlvTag::video(frame.timestamp, frame.data.clone());
148            self.gop_buffer.push(tag);
149        }
150    }
151}
152
153/// Statistics for a stream
154#[derive(Debug, Clone)]
155pub struct StreamStats {
156    /// Number of active subscribers
157    pub subscriber_count: u32,
158    /// Whether the stream has an active publisher
159    pub has_publisher: bool,
160    /// Current stream state
161    pub state: StreamState,
162    /// Number of frames in GOP buffer
163    pub gop_frame_count: usize,
164    /// Size of GOP buffer in bytes
165    pub gop_size_bytes: usize,
166}