use bytes::Bytes;
use rml_rtmp::time::RtmpTimestamp;
use std::collections::VecDeque;
use std::sync::Arc;
#[derive(Clone)]
pub(crate) enum FrameData {
Video {
timestamp: RtmpTimestamp,
data: Bytes,
},
Audio {
timestamp: RtmpTimestamp,
data: Bytes,
},
}
#[derive(Clone)]
pub struct FrozenGop {
frames: Arc<[FrameData]>,
}
impl FrozenGop {
fn new(frames: Vec<FrameData>) -> Self {
Self {
frames: Arc::from(frames.into_boxed_slice()),
}
}
pub fn frames(&self) -> &[FrameData] {
&self.frames
}
#[cfg(test)]
pub fn len(&self) -> usize {
self.frames.len()
}
#[cfg(test)]
pub fn strong_count(&self) -> usize {
Arc::strong_count(&self.frames)
}
}
pub struct Gops {
frozen: VecDeque<FrozenGop>,
current: Vec<FrameData>,
max_gops: usize,
}
impl Default for Gops {
fn default() -> Self {
Self::new(1)
}
}
impl Clone for Gops {
fn clone(&self) -> Self {
Self {
frozen: self.frozen.clone(), current: self.current.clone(),
max_gops: self.max_gops,
}
}
}
impl Gops {
pub fn new(max_gops: usize) -> Self {
Self {
frozen: VecDeque::with_capacity(max_gops),
current: Vec::with_capacity(256),
max_gops,
}
}
pub fn save_frame_data(&mut self, data: FrameData, is_key_frame: bool) {
if self.max_gops == 0 {
return;
}
if is_key_frame && !self.current.is_empty() {
let frames = std::mem::take(&mut self.current);
let frozen = FrozenGop::new(frames);
if self.frozen.len() >= self.max_gops {
self.frozen.pop_front();
}
self.frozen.push_back(frozen);
self.current.reserve(256);
}
self.current.push(data);
}
#[cfg(test)]
#[allow(dead_code)]
pub fn frozen_gops(&self) -> impl Iterator<Item = &FrozenGop> {
self.frozen.iter()
}
pub fn get_frozen_gops(&self) -> impl Iterator<Item = FrozenGop> + '_ {
self.frozen.iter().cloned()
}
#[cfg(test)]
pub fn get_current_frames(&self) -> &[FrameData] {
&self.current
}
pub fn is_enabled(&self) -> bool {
self.max_gops > 0
}
#[cfg(test)]
pub fn frozen_count(&self) -> usize {
self.frozen.len()
}
#[cfg(test)]
fn frozen_frame_count(&self) -> usize {
self.frozen.iter().map(|g| g.len()).sum()
}
#[cfg(test)]
fn current_frame_count(&self) -> usize {
self.current.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_video_frame(ts: u32, data: &[u8]) -> FrameData {
FrameData::Video {
timestamp: RtmpTimestamp { value: ts },
data: Bytes::copy_from_slice(data),
}
}
fn make_audio_frame(ts: u32, data: &[u8]) -> FrameData {
FrameData::Audio {
timestamp: RtmpTimestamp { value: ts },
data: Bytes::copy_from_slice(data),
}
}
#[test]
fn test_frozen_gop_zero_copy() {
let mut gops = Gops::new(2);
gops.save_frame_data(make_video_frame(0, b"keyframe1"), true);
gops.save_frame_data(make_video_frame(33, b"frame2"), false);
gops.save_frame_data(make_audio_frame(40, b"audio1"), false);
gops.save_frame_data(make_video_frame(66, b"keyframe2"), true);
let frozen: Vec<_> = gops.get_frozen_gops().collect();
assert_eq!(frozen.len(), 1);
assert_eq!(frozen[0].len(), 3);
let gop1 = frozen[0].clone();
let gop2 = gop1.clone();
assert!(gop1.strong_count() >= 2);
assert_eq!(gop1.strong_count(), gop2.strong_count());
}
#[test]
fn test_gop_boundary_correctness() {
let mut gops = Gops::new(3);
gops.save_frame_data(make_video_frame(0, b"k1"), true);
gops.save_frame_data(make_video_frame(33, b"p1"), false);
gops.save_frame_data(make_video_frame(66, b"k2"), true);
gops.save_frame_data(make_video_frame(100, b"p2"), false);
gops.save_frame_data(make_video_frame(133, b"k3"), true);
assert_eq!(gops.frozen_count(), 2);
assert_eq!(gops.frozen_frame_count(), 4); assert_eq!(gops.current_frame_count(), 1); }
#[test]
fn test_max_gops_limit() {
let mut gops = Gops::new(2);
gops.save_frame_data(make_video_frame(0, b"k1"), true);
gops.save_frame_data(make_video_frame(33, b"k2"), true);
gops.save_frame_data(make_video_frame(66, b"k3"), true);
gops.save_frame_data(make_video_frame(100, b"k4"), true);
assert_eq!(gops.frozen_count(), 2);
}
#[test]
fn test_repeated_keyframes() {
let mut gops = Gops::new(3);
gops.save_frame_data(make_video_frame(0, b"k1"), true);
gops.save_frame_data(make_video_frame(33, b"k2"), true); gops.save_frame_data(make_video_frame(66, b"k3"), true);
assert_eq!(gops.frozen_count(), 2);
let frozen: Vec<_> = gops.get_frozen_gops().collect();
assert_eq!(frozen[0].len(), 1);
assert_eq!(frozen[1].len(), 1);
}
#[test]
fn test_disabled_gop_cache() {
let mut gops = Gops::new(0);
gops.save_frame_data(make_video_frame(0, b"k1"), true);
gops.save_frame_data(make_video_frame(33, b"k2"), true);
assert_eq!(gops.frozen_count(), 0);
assert!(!gops.is_enabled());
}
#[test]
fn test_empty_current_gop_on_first_keyframe() {
let mut gops = Gops::new(2);
gops.save_frame_data(make_video_frame(0, b"k1"), true);
assert_eq!(gops.frozen_count(), 0);
assert_eq!(gops.get_current_frames().len(), 1);
}
}