use super::{MediaPacket, MediaType};
use parking_lot::RwLock;
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct DvrConfig {
pub window_duration: Duration,
pub segment_duration: Duration,
}
impl Default for DvrConfig {
fn default() -> Self {
Self {
window_duration: Duration::from_secs(3600), segment_duration: Duration::from_secs(2),
}
}
}
#[derive(Debug, Clone)]
pub struct DvrSegment {
pub start_timestamp: u64,
pub end_timestamp: u64,
pub video_packets: Vec<MediaPacket>,
pub audio_packets: Vec<MediaPacket>,
pub metadata_packets: Vec<MediaPacket>,
}
impl DvrSegment {
#[must_use]
pub fn new(start_timestamp: u64) -> Self {
Self {
start_timestamp,
end_timestamp: start_timestamp,
video_packets: Vec::new(),
audio_packets: Vec::new(),
metadata_packets: Vec::new(),
}
}
pub fn add_packet(&mut self, packet: MediaPacket) {
self.end_timestamp = self.end_timestamp.max(packet.timestamp);
match packet.media_type {
MediaType::Video => self.video_packets.push(packet),
MediaType::Audio => self.audio_packets.push(packet),
MediaType::Metadata => self.metadata_packets.push(packet),
}
}
#[must_use]
pub fn duration(&self) -> u64 {
self.end_timestamp.saturating_sub(self.start_timestamp)
}
#[must_use]
pub fn packet_count(&self) -> usize {
self.video_packets.len() + self.audio_packets.len() + self.metadata_packets.len()
}
#[must_use]
pub fn size_bytes(&self) -> usize {
self.video_packets
.iter()
.map(|p| p.data.len())
.sum::<usize>()
+ self
.audio_packets
.iter()
.map(|p| p.data.len())
.sum::<usize>()
+ self
.metadata_packets
.iter()
.map(|p| p.data.len())
.sum::<usize>()
}
#[must_use]
pub fn contains_timestamp(&self, timestamp: u64) -> bool {
timestamp >= self.start_timestamp && timestamp <= self.end_timestamp
}
}
pub struct DvrBuffer {
config: DvrConfig,
segments: VecDeque<Arc<RwLock<DvrSegment>>>,
current_segment: Option<Arc<RwLock<DvrSegment>>>,
current_segment_start: u64,
total_packets: usize,
total_bytes: usize,
}
impl DvrBuffer {
#[must_use]
pub fn new(config: DvrConfig) -> Self {
Self {
config,
segments: VecDeque::new(),
current_segment: None,
current_segment_start: 0,
total_packets: 0,
total_bytes: 0,
}
}
pub fn add_packet(&mut self, packet: MediaPacket) {
let segment_duration_ms = self.config.segment_duration.as_millis() as u64;
let need_new_segment = if let Some(_current) = &self.current_segment {
let elapsed = packet.timestamp.saturating_sub(self.current_segment_start);
elapsed >= segment_duration_ms
} else {
true
};
if need_new_segment {
self.finalize_current_segment();
self.current_segment_start = packet.timestamp;
let new_segment = Arc::new(RwLock::new(DvrSegment::new(packet.timestamp)));
self.current_segment = Some(new_segment);
}
if let Some(segment) = &self.current_segment {
let mut seg = segment.write();
self.total_bytes += packet.data.len();
self.total_packets += 1;
seg.add_packet(packet);
}
self.trim_old_segments();
}
fn finalize_current_segment(&mut self) {
if let Some(segment) = self.current_segment.take() {
self.segments.push_back(segment);
}
}
fn trim_old_segments(&mut self) {
if self.segments.is_empty() {
return;
}
let window_ms = self.config.window_duration.as_millis() as u64;
let latest_timestamp = if let Some(current) = &self.current_segment {
let seg = current.read();
seg.end_timestamp
} else if let Some(last) = self.segments.back() {
let seg = last.read();
seg.end_timestamp
} else {
return;
};
let cutoff_timestamp = latest_timestamp.saturating_sub(window_ms);
while let Some(first) = self.segments.front() {
let seg = first.read();
if seg.end_timestamp < cutoff_timestamp {
self.total_packets -= seg.packet_count();
self.total_bytes -= seg.size_bytes();
drop(seg);
self.segments.pop_front();
} else {
break;
}
}
}
#[must_use]
pub fn get_packets_in_range(
&self,
start_timestamp: u64,
end_timestamp: u64,
) -> Vec<MediaPacket> {
let mut packets = Vec::new();
for segment in &self.segments {
let seg = segment.read();
if seg.end_timestamp < start_timestamp {
continue;
}
if seg.start_timestamp > end_timestamp {
break;
}
for packet in &seg.video_packets {
if packet.timestamp >= start_timestamp && packet.timestamp <= end_timestamp {
packets.push(packet.clone());
}
}
for packet in &seg.audio_packets {
if packet.timestamp >= start_timestamp && packet.timestamp <= end_timestamp {
packets.push(packet.clone());
}
}
}
packets.sort_by_key(|p| p.timestamp);
packets
}
#[must_use]
pub fn get_packets_by_type(&self, media_type: MediaType) -> Vec<MediaPacket> {
let mut packets = Vec::new();
for segment in &self.segments {
let seg = segment.read();
let segment_packets = match media_type {
MediaType::Video => &seg.video_packets,
MediaType::Audio => &seg.audio_packets,
MediaType::Metadata => &seg.metadata_packets,
};
packets.extend(segment_packets.iter().cloned());
}
packets
}
#[must_use]
pub fn earliest_timestamp(&self) -> Option<u64> {
self.segments.front().map(|seg| seg.read().start_timestamp)
}
#[must_use]
pub fn latest_timestamp(&self) -> Option<u64> {
if let Some(current) = &self.current_segment {
return Some(current.read().end_timestamp);
}
self.segments.back().map(|seg| seg.read().end_timestamp)
}
#[must_use]
pub fn buffered_duration(&self) -> Duration {
if let (Some(earliest), Some(latest)) = (self.earliest_timestamp(), self.latest_timestamp())
{
let duration_ms = latest.saturating_sub(earliest);
Duration::from_millis(duration_ms)
} else {
Duration::ZERO
}
}
#[must_use]
pub fn segment_count(&self) -> usize {
self.segments.len()
}
#[must_use]
pub fn packet_count(&self) -> usize {
self.total_packets
}
#[must_use]
pub fn total_bytes(&self) -> usize {
self.total_bytes
}
pub fn clear(&mut self) {
self.segments.clear();
self.current_segment = None;
self.total_packets = 0;
self.total_bytes = 0;
}
#[must_use]
pub fn stats(&self) -> DvrStats {
DvrStats {
segment_count: self.segment_count(),
packet_count: self.packet_count(),
total_bytes: self.total_bytes(),
buffered_duration: self.buffered_duration(),
earliest_timestamp: self.earliest_timestamp(),
latest_timestamp: self.latest_timestamp(),
}
}
}
#[derive(Debug, Clone)]
pub struct DvrStats {
pub segment_count: usize,
pub packet_count: usize,
pub total_bytes: usize,
pub buffered_duration: Duration,
pub earliest_timestamp: Option<u64>,
pub latest_timestamp: Option<u64>,
}
pub struct DvrPlayback {
buffer: Arc<RwLock<DvrBuffer>>,
position: u64,
speed: f64,
paused: bool,
}
impl DvrPlayback {
#[must_use]
pub fn new(buffer: Arc<RwLock<DvrBuffer>>, start_position: u64) -> Self {
Self {
buffer,
position: start_position,
speed: 1.0,
paused: false,
}
}
pub fn seek(&mut self, timestamp: u64) -> bool {
let buf = self.buffer.read();
if let (Some(earliest), Some(latest)) = (buf.earliest_timestamp(), buf.latest_timestamp()) {
if timestamp >= earliest && timestamp <= latest {
self.position = timestamp;
return true;
}
}
false
}
pub fn seek_to_live(&mut self) {
let buf = self.buffer.read();
if let Some(latest) = buf.latest_timestamp() {
self.position = latest;
}
}
pub fn pause(&mut self) {
self.paused = true;
}
pub fn resume(&mut self) {
self.paused = false;
}
pub fn set_speed(&mut self, speed: f64) {
self.speed = speed.max(0.25).min(4.0);
}
#[must_use]
pub fn next_packet(&mut self) -> Option<MediaPacket> {
if self.paused {
return None;
}
let buf = self.buffer.read();
let packets = buf.get_packets_in_range(self.position, self.position + 1000);
if let Some(packet) = packets.first() {
self.position = packet.timestamp + packet.duration;
Some(packet.clone())
} else {
None
}
}
#[must_use]
pub fn is_at_live(&self) -> bool {
let buf = self.buffer.read();
if let Some(latest) = buf.latest_timestamp() {
latest.saturating_sub(self.position) < 5000 } else {
false
}
}
#[must_use]
pub const fn position(&self) -> u64 {
self.position
}
#[must_use]
pub const fn speed(&self) -> f64 {
self.speed
}
#[must_use]
pub const fn is_paused(&self) -> bool {
self.paused
}
}