Skip to main content

ez_ffmpeg/rtmp/
gop.rs

1// src/rtmp/gop.rs - Optimized version (Zero-Copy GOP)
2//
3// Optimizations:
4// 1. FrozenGop uses Arc<[FrameData]> for O(1) clone
5// 2. Separates frozen (completed) and current (writing) GOP
6// 3. Keeps old API compatibility layer, marked deprecated
7
8use bytes::Bytes;
9use rml_rtmp::time::RtmpTimestamp;
10use std::collections::VecDeque;
11use std::sync::Arc;
12
13/// Frame data - Structure unchanged
14/// ⚠️ Note: Keyframe detection is caller's responsibility, not implemented here
15#[derive(Clone)]
16pub(crate) enum FrameData {
17    Video {
18        timestamp: RtmpTimestamp,
19        data: Bytes,
20    },
21    Audio {
22        timestamp: RtmpTimestamp,
23        data: Bytes,
24    },
25}
26
27/// Frozen GOP - Immutable, O(1) clone
28///
29/// Uses `Arc<[FrameData]>` instead of `Arc<Vec<FrameData>>` to reduce indirection
30#[derive(Clone)]
31pub struct FrozenGop {
32    frames: Arc<[FrameData]>,
33}
34
35impl FrozenGop {
36    fn new(frames: Vec<FrameData>) -> Self {
37        Self {
38            frames: Arc::from(frames.into_boxed_slice()),
39        }
40    }
41
42    /// Get frame data slice - zero-copy
43    pub fn frames(&self) -> &[FrameData] {
44        &self.frames
45    }
46
47    #[cfg(test)]
48    pub fn len(&self) -> usize {
49        self.frames.len()
50    }
51
52    /// Get Arc strong reference count (for testing zero-copy)
53    #[cfg(test)]
54    pub fn strong_count(&self) -> usize {
55        Arc::strong_count(&self.frames)
56    }
57}
58
59/// Optimized GOP manager
60///
61/// Core improvements:
62/// - frozen: Completed GOPs, using FrozenGop for zero-copy sharing
63/// - current: Currently writing GOP
64pub struct Gops {
65    frozen: VecDeque<FrozenGop>,
66    current: Vec<FrameData>,
67    max_gops: usize,
68}
69
70impl Default for Gops {
71    fn default() -> Self {
72        Self::new(1)
73    }
74}
75
76impl Clone for Gops {
77    fn clone(&self) -> Self {
78        Self {
79            frozen: self.frozen.clone(), // FrozenGop clone is O(1)
80            current: self.current.clone(),
81            max_gops: self.max_gops,
82        }
83    }
84}
85
86impl Gops {
87    pub fn new(max_gops: usize) -> Self {
88        Self {
89            frozen: VecDeque::with_capacity(max_gops),
90            current: Vec::with_capacity(256),
91            max_gops,
92        }
93    }
94
95    /// Save frame data
96    ///
97    /// # Arguments
98    /// * `data` - Frame data
99    /// * `is_key_frame` - Whether it's a keyframe (determined by caller from video data)
100    ///
101    /// ⚠️ Keyframe detection must be correctly implemented by caller, wrong detection breaks GOP boundaries
102    pub fn save_frame_data(&mut self, data: FrameData, is_key_frame: bool) {
103        if self.max_gops == 0 {
104            return;
105        }
106
107        // When keyframe encountered, freeze current GOP
108        if is_key_frame && !self.current.is_empty() {
109            // Take current frames and create frozen GOP
110            let frames = std::mem::take(&mut self.current);
111            let frozen = FrozenGop::new(frames);
112
113            // If limit exceeded, remove oldest GOP
114            if self.frozen.len() >= self.max_gops {
115                self.frozen.pop_front();
116            }
117            self.frozen.push_back(frozen);
118
119            // Re-preallocate capacity
120            self.current.reserve(256);
121        }
122
123        self.current.push(data);
124    }
125
126    /// Get reference iterator for all frozen GOPs (test only)
127    #[cfg(test)]
128    #[allow(dead_code)]
129    pub fn frozen_gops(&self) -> impl Iterator<Item = &FrozenGop> {
130        self.frozen.iter()
131    }
132
133    /// Get clone iterator for frozen GOPs
134    ///
135    /// Each FrozenGop clone is O(1), only increments Arc reference count
136    pub fn get_frozen_gops(&self) -> impl Iterator<Item = FrozenGop> + '_ {
137        self.frozen.iter().cloned()
138    }
139
140    /// Get frames of currently writing GOP (test only)
141    #[cfg(test)]
142    pub fn get_current_frames(&self) -> &[FrameData] {
143        &self.current
144    }
145
146    /// Whether GOP caching is enabled
147    pub fn is_enabled(&self) -> bool {
148        self.max_gops > 0
149    }
150
151    /// Frozen GOP count (test only)
152    #[cfg(test)]
153    pub fn frozen_count(&self) -> usize {
154        self.frozen.len()
155    }
156
157    /// Frozen frames count (test only)
158    #[cfg(test)]
159    fn frozen_frame_count(&self) -> usize {
160        self.frozen.iter().map(|g| g.len()).sum()
161    }
162
163    /// Current GOP frame count (test only)
164    #[cfg(test)]
165    fn current_frame_count(&self) -> usize {
166        self.current.len()
167    }
168}
169
170#[cfg(test)]
171mod tests {
172    use super::*;
173
174    fn make_video_frame(ts: u32, data: &[u8]) -> FrameData {
175        FrameData::Video {
176            timestamp: RtmpTimestamp { value: ts },
177            data: Bytes::copy_from_slice(data),
178        }
179    }
180
181    fn make_audio_frame(ts: u32, data: &[u8]) -> FrameData {
182        FrameData::Audio {
183            timestamp: RtmpTimestamp { value: ts },
184            data: Bytes::copy_from_slice(data),
185        }
186    }
187
188    #[test]
189    fn test_frozen_gop_zero_copy() {
190        let mut gops = Gops::new(2);
191
192        // Add first keyframe and some frames
193        gops.save_frame_data(make_video_frame(0, b"keyframe1"), true);
194        gops.save_frame_data(make_video_frame(33, b"frame2"), false);
195        gops.save_frame_data(make_audio_frame(40, b"audio1"), false);
196
197        // New keyframe triggers freeze
198        gops.save_frame_data(make_video_frame(66, b"keyframe2"), true);
199
200        // Should have one frozen GOP
201        let frozen: Vec<_> = gops.get_frozen_gops().collect();
202        assert_eq!(frozen.len(), 1);
203        assert_eq!(frozen[0].len(), 3); // keyframe1, frame2, audio1
204
205        // Verify zero-copy
206        let gop1 = frozen[0].clone();
207        let gop2 = gop1.clone();
208
209        // Arc reference count should increase
210        assert!(gop1.strong_count() >= 2);
211        assert_eq!(gop1.strong_count(), gop2.strong_count());
212    }
213
214    #[test]
215    fn test_gop_boundary_correctness() {
216        let mut gops = Gops::new(3);
217
218        // GOP 1
219        gops.save_frame_data(make_video_frame(0, b"k1"), true);
220        gops.save_frame_data(make_video_frame(33, b"p1"), false);
221
222        // GOP 2
223        gops.save_frame_data(make_video_frame(66, b"k2"), true);
224        gops.save_frame_data(make_video_frame(100, b"p2"), false);
225
226        // GOP 3
227        gops.save_frame_data(make_video_frame(133, b"k3"), true);
228
229        // Should have 2 frozen GOPs
230        assert_eq!(gops.frozen_count(), 2);
231        assert_eq!(gops.frozen_frame_count(), 4); // (k1+p1) + (k2+p2)
232        assert_eq!(gops.current_frame_count(), 1); // k3
233    }
234
235    #[test]
236    fn test_max_gops_limit() {
237        let mut gops = Gops::new(2);
238
239        // Create 3 GOPs, should only keep 2
240        gops.save_frame_data(make_video_frame(0, b"k1"), true);
241        gops.save_frame_data(make_video_frame(33, b"k2"), true);
242        gops.save_frame_data(make_video_frame(66, b"k3"), true);
243        gops.save_frame_data(make_video_frame(100, b"k4"), true);
244
245        // Oldest GOP should be removed
246        assert_eq!(gops.frozen_count(), 2);
247    }
248
249    #[test]
250    fn test_repeated_keyframes() {
251        let mut gops = Gops::new(3);
252
253        // Consecutive keyframes
254        gops.save_frame_data(make_video_frame(0, b"k1"), true);
255        gops.save_frame_data(make_video_frame(33, b"k2"), true); // Triggers freeze
256        gops.save_frame_data(make_video_frame(66, b"k3"), true); // Triggers freeze
257
258        // Should have 2 frozen GOPs, each with only 1 frame
259        assert_eq!(gops.frozen_count(), 2);
260
261        let frozen: Vec<_> = gops.get_frozen_gops().collect();
262        assert_eq!(frozen[0].len(), 1);
263        assert_eq!(frozen[1].len(), 1);
264    }
265
266    #[test]
267    fn test_disabled_gop_cache() {
268        let mut gops = Gops::new(0);
269
270        gops.save_frame_data(make_video_frame(0, b"k1"), true);
271        gops.save_frame_data(make_video_frame(33, b"k2"), true);
272
273        // Should not save any frames when disabled
274        assert_eq!(gops.frozen_count(), 0);
275        assert!(!gops.is_enabled());
276    }
277
278    #[test]
279    fn test_empty_current_gop_on_first_keyframe() {
280        let mut gops = Gops::new(2);
281
282        // First keyframe, current is empty, should not create empty frozen GOP
283        gops.save_frame_data(make_video_frame(0, b"k1"), true);
284
285        assert_eq!(gops.frozen_count(), 0);
286        assert_eq!(gops.get_current_frames().len(), 1);
287    }
288}