1use bytes::Bytes;
9use rml_rtmp::time::RtmpTimestamp;
10use std::collections::VecDeque;
11use std::sync::Arc;
12
13#[derive(Clone)]
16pub(crate) enum FrameData {
17 Video {
18 timestamp: RtmpTimestamp,
19 data: Bytes,
20 },
21 Audio {
22 timestamp: RtmpTimestamp,
23 data: Bytes,
24 },
25}
26
27#[derive(Clone)]
31pub struct FrozenGop {
32 frames: Arc<[FrameData]>,
33}
34
35impl FrozenGop {
36 fn new(frames: Vec<FrameData>) -> Self {
37 Self {
38 frames: Arc::from(frames.into_boxed_slice()),
39 }
40 }
41
42 pub fn frames(&self) -> &[FrameData] {
44 &self.frames
45 }
46
47 #[cfg(test)]
48 pub fn len(&self) -> usize {
49 self.frames.len()
50 }
51
52 #[cfg(test)]
54 pub fn strong_count(&self) -> usize {
55 Arc::strong_count(&self.frames)
56 }
57}
58
59pub struct Gops {
65 frozen: VecDeque<FrozenGop>,
66 current: Vec<FrameData>,
67 max_gops: usize,
68}
69
70impl Default for Gops {
71 fn default() -> Self {
72 Self::new(1)
73 }
74}
75
76impl Clone for Gops {
77 fn clone(&self) -> Self {
78 Self {
79 frozen: self.frozen.clone(), current: self.current.clone(),
81 max_gops: self.max_gops,
82 }
83 }
84}
85
86impl Gops {
87 pub fn new(max_gops: usize) -> Self {
88 Self {
89 frozen: VecDeque::with_capacity(max_gops),
90 current: Vec::with_capacity(256),
91 max_gops,
92 }
93 }
94
95 pub fn save_frame_data(&mut self, data: FrameData, is_key_frame: bool) {
103 if self.max_gops == 0 {
104 return;
105 }
106
107 if is_key_frame && !self.current.is_empty() {
109 let frames = std::mem::take(&mut self.current);
111 let frozen = FrozenGop::new(frames);
112
113 if self.frozen.len() >= self.max_gops {
115 self.frozen.pop_front();
116 }
117 self.frozen.push_back(frozen);
118
119 self.current.reserve(256);
121 }
122
123 self.current.push(data);
124 }
125
126 #[cfg(test)]
128 #[allow(dead_code)]
129 pub fn frozen_gops(&self) -> impl Iterator<Item = &FrozenGop> {
130 self.frozen.iter()
131 }
132
133 pub fn get_frozen_gops(&self) -> impl Iterator<Item = FrozenGop> + '_ {
137 self.frozen.iter().cloned()
138 }
139
140 #[cfg(test)]
142 pub fn get_current_frames(&self) -> &[FrameData] {
143 &self.current
144 }
145
146 pub fn is_enabled(&self) -> bool {
148 self.max_gops > 0
149 }
150
151 #[cfg(test)]
153 pub fn frozen_count(&self) -> usize {
154 self.frozen.len()
155 }
156
157 #[cfg(test)]
159 fn frozen_frame_count(&self) -> usize {
160 self.frozen.iter().map(|g| g.len()).sum()
161 }
162
163 #[cfg(test)]
165 fn current_frame_count(&self) -> usize {
166 self.current.len()
167 }
168}
169
170#[cfg(test)]
171mod tests {
172 use super::*;
173
174 fn make_video_frame(ts: u32, data: &[u8]) -> FrameData {
175 FrameData::Video {
176 timestamp: RtmpTimestamp { value: ts },
177 data: Bytes::copy_from_slice(data),
178 }
179 }
180
181 fn make_audio_frame(ts: u32, data: &[u8]) -> FrameData {
182 FrameData::Audio {
183 timestamp: RtmpTimestamp { value: ts },
184 data: Bytes::copy_from_slice(data),
185 }
186 }
187
188 #[test]
189 fn test_frozen_gop_zero_copy() {
190 let mut gops = Gops::new(2);
191
192 gops.save_frame_data(make_video_frame(0, b"keyframe1"), true);
194 gops.save_frame_data(make_video_frame(33, b"frame2"), false);
195 gops.save_frame_data(make_audio_frame(40, b"audio1"), false);
196
197 gops.save_frame_data(make_video_frame(66, b"keyframe2"), true);
199
200 let frozen: Vec<_> = gops.get_frozen_gops().collect();
202 assert_eq!(frozen.len(), 1);
203 assert_eq!(frozen[0].len(), 3); let gop1 = frozen[0].clone();
207 let gop2 = gop1.clone();
208
209 assert!(gop1.strong_count() >= 2);
211 assert_eq!(gop1.strong_count(), gop2.strong_count());
212 }
213
214 #[test]
215 fn test_gop_boundary_correctness() {
216 let mut gops = Gops::new(3);
217
218 gops.save_frame_data(make_video_frame(0, b"k1"), true);
220 gops.save_frame_data(make_video_frame(33, b"p1"), false);
221
222 gops.save_frame_data(make_video_frame(66, b"k2"), true);
224 gops.save_frame_data(make_video_frame(100, b"p2"), false);
225
226 gops.save_frame_data(make_video_frame(133, b"k3"), true);
228
229 assert_eq!(gops.frozen_count(), 2);
231 assert_eq!(gops.frozen_frame_count(), 4); assert_eq!(gops.current_frame_count(), 1); }
234
235 #[test]
236 fn test_max_gops_limit() {
237 let mut gops = Gops::new(2);
238
239 gops.save_frame_data(make_video_frame(0, b"k1"), true);
241 gops.save_frame_data(make_video_frame(33, b"k2"), true);
242 gops.save_frame_data(make_video_frame(66, b"k3"), true);
243 gops.save_frame_data(make_video_frame(100, b"k4"), true);
244
245 assert_eq!(gops.frozen_count(), 2);
247 }
248
249 #[test]
250 fn test_repeated_keyframes() {
251 let mut gops = Gops::new(3);
252
253 gops.save_frame_data(make_video_frame(0, b"k1"), true);
255 gops.save_frame_data(make_video_frame(33, b"k2"), true); gops.save_frame_data(make_video_frame(66, b"k3"), true); assert_eq!(gops.frozen_count(), 2);
260
261 let frozen: Vec<_> = gops.get_frozen_gops().collect();
262 assert_eq!(frozen[0].len(), 1);
263 assert_eq!(frozen[1].len(), 1);
264 }
265
266 #[test]
267 fn test_disabled_gop_cache() {
268 let mut gops = Gops::new(0);
269
270 gops.save_frame_data(make_video_frame(0, b"k1"), true);
271 gops.save_frame_data(make_video_frame(33, b"k2"), true);
272
273 assert_eq!(gops.frozen_count(), 0);
275 assert!(!gops.is_enabled());
276 }
277
278 #[test]
279 fn test_empty_current_gop_on_first_keyframe() {
280 let mut gops = Gops::new(2);
281
282 gops.save_frame_data(make_video_frame(0, b"k1"), true);
284
285 assert_eq!(gops.frozen_count(), 0);
286 assert_eq!(gops.get_current_frames().len(), 1);
287 }
288}