#![allow(dead_code)]
#![allow(clippy::module_name_repetitions)]
use bytes::Bytes;
use std::collections::VecDeque;
use std::time::{Duration, SystemTime};
const MAX_DVR_SEGMENTS: usize = 500;
#[derive(Debug)]
pub struct DvrBuffer {
segments: VecDeque<DvrSegment>,
buffer_depth: Duration,
max_segments: usize,
total_size: u64,
max_size: u64,
}
#[derive(Debug, Clone)]
pub struct DvrSegment {
pub number: u64,
pub representation_id: String,
pub data: Bytes,
pub pts: Duration,
pub duration: Duration,
pub wall_clock_time: SystemTime,
pub timescale: u32,
}
impl DvrBuffer {
#[must_use]
pub fn new(buffer_depth: Duration) -> Self {
Self {
segments: VecDeque::new(),
buffer_depth,
max_segments: MAX_DVR_SEGMENTS,
total_size: 0,
max_size: 0,
}
}
#[must_use]
pub fn with_max_size(buffer_depth: Duration, max_size: u64) -> Self {
Self {
segments: VecDeque::new(),
buffer_depth,
max_segments: MAX_DVR_SEGMENTS,
total_size: 0,
max_size,
}
}
pub fn add_segment(&mut self, segment: DvrSegment) {
self.total_size += segment.data.len() as u64;
self.segments.push_back(segment);
self.trim_buffer();
}
#[must_use]
pub fn get_segment(&self, number: u64, representation_id: &str) -> Option<&DvrSegment> {
self.segments
.iter()
.find(|s| s.number == number && s.representation_id == representation_id)
}
pub fn get_segments_in_range(
&self,
start: Duration,
end: Duration,
representation_id: &str,
) -> Vec<&DvrSegment> {
self.segments
.iter()
.filter(|s| {
s.representation_id == representation_id
&& s.pts < end
&& s.pts + s.duration > start
})
.collect()
}
pub fn get_segments_for_representation(&self, representation_id: &str) -> Vec<&DvrSegment> {
self.segments
.iter()
.filter(|s| s.representation_id == representation_id)
.collect()
}
#[must_use]
pub fn earliest_segment(&self) -> Option<&DvrSegment> {
self.segments.front()
}
#[must_use]
pub fn latest_segment(&self) -> Option<&DvrSegment> {
self.segments.back()
}
#[must_use]
pub fn earliest_time(&self) -> Option<Duration> {
self.segments.front().map(|s| s.pts)
}
#[must_use]
pub fn latest_time(&self) -> Option<Duration> {
self.segments.back().map(|s| s.pts + s.duration)
}
#[must_use]
pub fn available_window(&self) -> Option<Duration> {
match (self.earliest_time(), self.latest_time()) {
(Some(start), Some(end)) if end > start => Some(end - start),
_ => None,
}
}
#[must_use]
pub fn segment_count(&self) -> usize {
self.segments.len()
}
#[must_use]
pub const fn total_size(&self) -> u64 {
self.total_size
}
#[must_use]
pub const fn buffer_depth(&self) -> Duration {
self.buffer_depth
}
pub fn set_buffer_depth(&mut self, depth: Duration) {
self.buffer_depth = depth;
self.trim_buffer();
}
pub fn set_max_segments(&mut self, max: usize) {
self.max_segments = max;
self.trim_buffer();
}
pub fn clear(&mut self) {
self.segments.clear();
self.total_size = 0;
}
fn trim_buffer(&mut self) {
let latest_time = match self.latest_time() {
Some(t) => t,
None => return,
};
let cutoff_time = latest_time.saturating_sub(self.buffer_depth);
while let Some(segment) = self.segments.front() {
if segment.pts + segment.duration <= cutoff_time {
if let Some(removed) = self.segments.pop_front() {
self.total_size = self.total_size.saturating_sub(removed.data.len() as u64);
}
} else {
break;
}
}
while self.segments.len() > self.max_segments {
if let Some(removed) = self.segments.pop_front() {
self.total_size = self.total_size.saturating_sub(removed.data.len() as u64);
}
}
if self.max_size > 0 {
while self.total_size > self.max_size {
if let Some(removed) = self.segments.pop_front() {
self.total_size = self.total_size.saturating_sub(removed.data.len() as u64);
} else {
break;
}
}
}
}
#[must_use]
pub fn stats(&self) -> DvrStats {
DvrStats {
segment_count: self.segment_count(),
total_size: self.total_size,
earliest_time: self.earliest_time(),
latest_time: self.latest_time(),
available_window: self.available_window(),
buffer_depth: self.buffer_depth,
}
}
#[must_use]
pub fn has_segment(&self, number: u64, representation_id: &str) -> bool {
self.segments
.iter()
.any(|s| s.number == number && s.representation_id == representation_id)
}
pub fn available_segment_numbers(&self, representation_id: &str) -> Vec<u64> {
self.segments
.iter()
.filter(|s| s.representation_id == representation_id)
.map(|s| s.number)
.collect()
}
}
#[derive(Debug, Clone)]
pub struct DvrStats {
pub segment_count: usize,
pub total_size: u64,
pub earliest_time: Option<Duration>,
pub latest_time: Option<Duration>,
pub available_window: Option<Duration>,
pub buffer_depth: Duration,
}
impl DvrStats {
#[must_use]
pub fn utilization(&self) -> f64 {
match self.available_window {
Some(window) => {
let window_secs = window.as_secs_f64();
let depth_secs = self.buffer_depth.as_secs_f64();
if depth_secs > 0.0 {
(window_secs / depth_secs).min(1.0)
} else {
0.0
}
}
None => 0.0,
}
}
}
impl DvrSegment {
#[must_use]
pub fn new(
number: u64,
representation_id: impl Into<String>,
data: Bytes,
pts: Duration,
duration: Duration,
timescale: u32,
) -> Self {
Self {
number,
representation_id: representation_id.into(),
data,
pts,
duration,
wall_clock_time: SystemTime::now(),
timescale,
}
}
#[must_use]
pub fn size(&self) -> usize {
self.data.len()
}
#[must_use]
pub fn end_time(&self) -> Duration {
self.pts + self.duration
}
#[must_use]
pub fn contains_time(&self, time: Duration) -> bool {
time >= self.pts && time < self.end_time()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_segment(number: u64, pts_secs: u64, duration_secs: u64) -> DvrSegment {
DvrSegment::new(
number,
"test-repr",
Bytes::from(vec![0u8; 1024]),
Duration::from_secs(pts_secs),
Duration::from_secs(duration_secs),
90000,
)
}
#[test]
fn test_dvr_buffer_creation() {
let buffer = DvrBuffer::new(Duration::from_secs(30));
assert_eq!(buffer.segment_count(), 0);
assert_eq!(buffer.total_size(), 0);
assert_eq!(buffer.buffer_depth(), Duration::from_secs(30));
}
#[test]
fn test_add_segment() {
let mut buffer = DvrBuffer::new(Duration::from_secs(30));
let segment = create_test_segment(1, 0, 2);
buffer.add_segment(segment);
assert_eq!(buffer.segment_count(), 1);
assert_eq!(buffer.total_size(), 1024);
}
#[test]
fn test_get_segment() {
let mut buffer = DvrBuffer::new(Duration::from_secs(30));
buffer.add_segment(create_test_segment(1, 0, 2));
buffer.add_segment(create_test_segment(2, 2, 2));
let seg = buffer.get_segment(1, "test-repr");
assert!(seg.is_some());
assert_eq!(seg.expect("should succeed in test").number, 1);
let seg = buffer.get_segment(99, "test-repr");
assert!(seg.is_none());
}
#[test]
fn test_time_based_trimming() {
let mut buffer = DvrBuffer::new(Duration::from_secs(10));
for i in 0..10 {
buffer.add_segment(create_test_segment(i + 1, i * 2, 2));
}
assert!(buffer.segment_count() <= 6);
}
#[test]
fn test_earliest_latest_time() {
let mut buffer = DvrBuffer::new(Duration::from_secs(30));
buffer.add_segment(create_test_segment(1, 0, 2));
buffer.add_segment(create_test_segment(2, 2, 2));
buffer.add_segment(create_test_segment(3, 4, 2));
assert_eq!(buffer.earliest_time(), Some(Duration::from_secs(0)));
assert_eq!(buffer.latest_time(), Some(Duration::from_secs(6)));
}
#[test]
fn test_available_window() {
let mut buffer = DvrBuffer::new(Duration::from_secs(30));
buffer.add_segment(create_test_segment(1, 0, 2));
buffer.add_segment(create_test_segment(2, 2, 2));
assert_eq!(buffer.available_window(), Some(Duration::from_secs(4)));
}
#[test]
fn test_get_segments_in_range() {
let mut buffer = DvrBuffer::new(Duration::from_secs(30));
buffer.add_segment(create_test_segment(1, 0, 2));
buffer.add_segment(create_test_segment(2, 2, 2));
buffer.add_segment(create_test_segment(3, 4, 2));
let segments = buffer.get_segments_in_range(
Duration::from_secs(1),
Duration::from_secs(5),
"test-repr",
);
assert_eq!(segments.len(), 3);
}
#[test]
fn test_clear() {
let mut buffer = DvrBuffer::new(Duration::from_secs(30));
buffer.add_segment(create_test_segment(1, 0, 2));
buffer.add_segment(create_test_segment(2, 2, 2));
buffer.clear();
assert_eq!(buffer.segment_count(), 0);
assert_eq!(buffer.total_size(), 0);
}
#[test]
fn test_dvr_stats() {
let mut buffer = DvrBuffer::new(Duration::from_secs(30));
buffer.add_segment(create_test_segment(1, 0, 2));
let stats = buffer.stats();
assert_eq!(stats.segment_count, 1);
assert!(stats.utilization() < 1.0);
}
#[test]
fn test_max_size_constraint() {
let mut buffer = DvrBuffer::with_max_size(Duration::from_secs(30), 2048);
buffer.add_segment(create_test_segment(1, 0, 2));
buffer.add_segment(create_test_segment(2, 2, 2));
buffer.add_segment(create_test_segment(3, 4, 2));
assert!(buffer.total_size() <= 2048);
}
}