use crate::error::{NetError, NetResult};
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TimingMode {
Narrow,
Wide,
WideLinear,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransmissionMode {
Gapped,
Linear,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ScanType {
Progressive,
InterlacedField1,
InterlacedField2,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct FrameRate {
pub numerator: u32,
pub denominator: u32,
}
impl FrameRate {
#[must_use]
pub const fn new(numerator: u32, denominator: u32) -> Self {
Self {
numerator,
denominator,
}
}
#[must_use]
pub fn as_f64(&self) -> f64 {
f64::from(self.numerator) / f64::from(self.denominator)
}
#[must_use]
pub fn frame_period_ns(&self) -> u64 {
(1_000_000_000u64 * u64::from(self.denominator)) / u64::from(self.numerator)
}
pub const FPS_23_976: Self = Self::new(24000, 1001);
pub const FPS_24: Self = Self::new(24, 1);
pub const FPS_25: Self = Self::new(25, 1);
pub const FPS_29_97: Self = Self::new(30000, 1001);
pub const FPS_30: Self = Self::new(30, 1);
pub const FPS_50: Self = Self::new(50, 1);
pub const FPS_59_94: Self = Self::new(60000, 1001);
pub const FPS_60: Self = Self::new(60, 1);
pub const FPS_100: Self = Self::new(100, 1);
pub const FPS_119_88: Self = Self::new(120000, 1001);
pub const FPS_120: Self = Self::new(120, 1);
}
#[derive(Debug, Clone)]
pub struct NarrowTimingParams {
pub t_readiness_ns: u64,
pub t_drain_ns: u64,
}
#[derive(Debug, Clone)]
pub struct WideTimingParams {
pub t_readiness_ns: u64,
pub t_drain_ns: u64,
}
#[derive(Debug, Clone)]
pub struct TimingCalculator {
mode: TimingMode,
transmission: TransmissionMode,
frame_rate: FrameRate,
active_lines: u32,
packets_per_frame: u32,
}
impl TimingCalculator {
#[must_use]
pub const fn new(
mode: TimingMode,
transmission: TransmissionMode,
frame_rate: FrameRate,
active_lines: u32,
packets_per_frame: u32,
) -> Self {
Self {
mode,
transmission,
frame_rate,
active_lines,
packets_per_frame,
}
}
#[must_use]
pub fn narrow_t_readiness(&self) -> u64 {
let frame_period = self.frame_rate.frame_period_ns();
let t_drain = self.narrow_t_drain();
frame_period.saturating_sub(t_drain)
}
#[must_use]
pub fn narrow_t_drain(&self) -> u64 {
let frame_period = self.frame_rate.frame_period_ns();
let total_lines: u64 = if self.frame_rate.numerator == 25 || self.frame_rate.numerator == 50
{
625 } else {
525 };
let line_time = frame_period / total_lines;
match self.transmission {
TransmissionMode::Gapped => u64::from(self.active_lines) * line_time,
TransmissionMode::Linear => frame_period.saturating_sub(43 * line_time),
}
}
#[must_use]
pub fn wide_t_readiness(&self) -> u64 {
let frame_period = self.frame_rate.frame_period_ns();
let t_drain = self.wide_t_drain();
frame_period.saturating_sub(t_drain)
}
#[must_use]
pub fn wide_t_drain(&self) -> u64 {
self.frame_rate.frame_period_ns()
}
#[must_use]
pub fn get_timing_params(&self) -> (u64, u64) {
match self.mode {
TimingMode::Narrow => (self.narrow_t_readiness(), self.narrow_t_drain()),
TimingMode::Wide | TimingMode::WideLinear => {
(self.wide_t_readiness(), self.wide_t_drain())
}
}
}
#[must_use]
pub fn packet_transmission_time(&self, packet_index: u32, frame_start_ns: u64) -> u64 {
let (t_readiness, t_drain) = self.get_timing_params();
match self.transmission {
TransmissionMode::Gapped => {
let packets_per_line = self.packets_per_frame / self.active_lines;
let line_index = packet_index / packets_per_line;
let frame_period = self.frame_rate.frame_period_ns();
let line_time = frame_period / u64::from(self.active_lines);
frame_start_ns + t_readiness + (u64::from(line_index) * line_time)
}
TransmissionMode::Linear => {
let packet_interval = t_drain / u64::from(self.packets_per_frame);
frame_start_ns + t_readiness + (u64::from(packet_index) * packet_interval)
}
}
}
}
#[derive(Debug, Clone)]
pub struct RtpTimestampGenerator {
clock_rate: u32,
current_timestamp: u32,
frame_rate: FrameRate,
}
impl RtpTimestampGenerator {
#[must_use]
pub fn new(clock_rate: u32, frame_rate: FrameRate, initial_timestamp: u32) -> Self {
Self {
clock_rate,
current_timestamp: initial_timestamp,
frame_rate,
}
}
#[must_use]
pub const fn current(&self) -> u32 {
self.current_timestamp
}
pub fn advance_frame(&mut self) {
let increment = (u64::from(self.clock_rate) * u64::from(self.frame_rate.denominator))
/ u64::from(self.frame_rate.numerator);
self.current_timestamp = self.current_timestamp.wrapping_add(increment as u32);
}
#[must_use]
pub fn ptp_to_rtp(&self, ptp_ns: u64) -> u32 {
((ptp_ns * u64::from(self.clock_rate)) / 1_000_000_000) as u32
}
#[must_use]
pub fn rtp_to_ptp(&self, rtp_ts: u32) -> u64 {
(u64::from(rtp_ts) * 1_000_000_000) / u64::from(self.clock_rate)
}
}
#[derive(Debug)]
pub struct PlayoutBuffer<T> {
buffer: Vec<(u64, T)>,
target_depth_ns: u64,
max_size: usize,
}
impl<T> PlayoutBuffer<T> {
#[must_use]
pub fn new(target_depth_ns: u64, max_size: usize) -> Self {
Self {
buffer: Vec::with_capacity(max_size),
target_depth_ns,
max_size,
}
}
pub fn push(&mut self, playout_time_ns: u64, item: T) -> NetResult<()> {
if self.buffer.len() >= self.max_size {
return Err(NetError::buffer("Playout buffer full"));
}
let pos = self
.buffer
.binary_search_by_key(&playout_time_ns, |(t, _)| *t)
.unwrap_or_else(|e| e);
self.buffer.insert(pos, (playout_time_ns, item));
Ok(())
}
pub fn pop_ready(&mut self, current_time_ns: u64) -> Vec<T> {
let mut ready = Vec::new();
while let Some((playout_time, _)) = self.buffer.first() {
if *playout_time <= current_time_ns {
let (_, item) = self.buffer.remove(0);
ready.push(item);
} else {
break;
}
}
ready
}
#[must_use]
pub fn depth_ns(&self, current_time_ns: u64) -> u64 {
if let Some((playout_time, _)) = self.buffer.last() {
playout_time.saturating_sub(current_time_ns)
} else {
0
}
}
#[must_use]
pub fn is_ready(&self, current_time_ns: u64) -> bool {
self.depth_ns(current_time_ns) >= self.target_depth_ns
}
#[must_use]
pub fn len(&self) -> usize {
self.buffer.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
pub fn clear(&mut self) {
self.buffer.clear();
}
}
#[derive(Debug, Clone)]
pub struct RateController {
target_bitrate: u64,
current_time_ns: u64,
bytes_transmitted: u64,
start_time: Instant,
}
impl RateController {
#[must_use]
pub fn new(target_bitrate: u64) -> Self {
Self {
target_bitrate,
current_time_ns: 0,
bytes_transmitted: 0,
start_time: Instant::now(),
}
}
#[must_use]
pub fn calculate_delay(&self, packet_size: usize) -> Duration {
let elapsed = self.start_time.elapsed();
let elapsed_ns = elapsed.as_nanos() as u64;
let bits_transmitted = self.bytes_transmitted * 8;
let expected_bits = (elapsed_ns * self.target_bitrate) / 1_000_000_000;
if bits_transmitted < expected_bits {
Duration::ZERO
} else {
let next_bits = bits_transmitted + ((packet_size * 8) as u64);
let next_time_ns = (next_bits * 1_000_000_000) / self.target_bitrate;
let delay_ns = next_time_ns.saturating_sub(elapsed_ns);
Duration::from_nanos(delay_ns)
}
}
pub fn record_transmission(&mut self, packet_size: usize) {
self.bytes_transmitted += packet_size as u64;
}
pub fn reset(&mut self) {
self.bytes_transmitted = 0;
self.start_time = Instant::now();
}
}
#[derive(Debug, Clone)]
pub struct SyncState {
pub ptp_synced: bool,
pub ptp_offset_ns: i64,
pub ptp_accuracy_ns: u64,
pub last_sync_update: Instant,
}
impl SyncState {
#[must_use]
pub fn new() -> Self {
Self {
ptp_synced: false,
ptp_offset_ns: 0,
ptp_accuracy_ns: 0,
last_sync_update: Instant::now(),
}
}
pub fn update_ptp(&mut self, offset_ns: i64, accuracy_ns: u64) {
self.ptp_offset_ns = offset_ns;
self.ptp_accuracy_ns = accuracy_ns;
self.ptp_synced = accuracy_ns < 1_000_000; self.last_sync_update = Instant::now();
}
#[must_use]
pub fn is_synced(&self) -> bool {
self.ptp_synced && self.last_sync_update.elapsed() < Duration::from_secs(10)
}
#[must_use]
pub fn current_ptp_time_ns(&self, system_time_ns: u64) -> u64 {
if self.ptp_offset_ns >= 0 {
system_time_ns + (self.ptp_offset_ns as u64)
} else {
system_time_ns.saturating_sub(self.ptp_offset_ns.unsigned_abs())
}
}
}
impl Default for SyncState {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct FrameSyncPoint {
frame_rate: FrameRate,
ptp_epoch_ns: u64,
}
impl FrameSyncPoint {
#[must_use]
pub const fn new(frame_rate: FrameRate, ptp_epoch_ns: u64) -> Self {
Self {
frame_rate,
ptp_epoch_ns,
}
}
#[must_use]
pub fn frame_start_time(&self, ptp_time_ns: u64) -> u64 {
let frame_period_ns = self.frame_rate.frame_period_ns();
let elapsed = ptp_time_ns.saturating_sub(self.ptp_epoch_ns);
let frame_number = elapsed / frame_period_ns;
self.ptp_epoch_ns + (frame_number * frame_period_ns)
}
#[must_use]
pub fn next_frame_start(&self, ptp_time_ns: u64) -> u64 {
let current_frame_start = self.frame_start_time(ptp_time_ns);
current_frame_start + self.frame_rate.frame_period_ns()
}
#[must_use]
pub fn frame_number(&self, ptp_time_ns: u64) -> u64 {
let frame_period_ns = self.frame_rate.frame_period_ns();
let elapsed = ptp_time_ns.saturating_sub(self.ptp_epoch_ns);
elapsed / frame_period_ns
}
}
#[derive(Debug)]
pub struct TimingValidator {
mode: TimingMode,
frame_rate: FrameRate,
max_deviation_ns: u64,
}
impl TimingValidator {
#[must_use]
pub fn new(mode: TimingMode, frame_rate: FrameRate) -> Self {
let max_deviation_ns = match mode {
TimingMode::Narrow => 100_000, TimingMode::Wide => 1_000_000, TimingMode::WideLinear => 1_000_000, };
Self {
mode,
frame_rate,
max_deviation_ns,
}
}
pub fn validate_packet_timing(
&self,
expected_time_ns: u64,
actual_time_ns: u64,
) -> NetResult<()> {
let deviation = if actual_time_ns > expected_time_ns {
actual_time_ns - expected_time_ns
} else {
expected_time_ns - actual_time_ns
};
if deviation > self.max_deviation_ns {
Err(NetError::protocol(format!(
"Timing violation: deviation {} ns exceeds limit {} ns",
deviation, self.max_deviation_ns
)))
} else {
Ok(())
}
}
pub fn validate_frame_timing(
&self,
expected_frame_start_ns: u64,
actual_frame_start_ns: u64,
) -> NetResult<()> {
let frame_period_ns = self.frame_rate.frame_period_ns();
let max_frame_deviation = frame_period_ns / 1000;
let deviation = if actual_frame_start_ns > expected_frame_start_ns {
actual_frame_start_ns - expected_frame_start_ns
} else {
expected_frame_start_ns - actual_frame_start_ns
};
if deviation > max_frame_deviation {
Err(NetError::protocol(format!(
"Frame timing violation: deviation {} ns exceeds limit {} ns",
deviation, max_frame_deviation
)))
} else {
Ok(())
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_frame_rate() {
let fps_25 = FrameRate::FPS_25;
assert_eq!(fps_25.as_f64(), 25.0);
assert_eq!(fps_25.frame_period_ns(), 40_000_000);
let fps_29_97 = FrameRate::FPS_29_97;
assert!((fps_29_97.as_f64() - 29.97).abs() < 0.01);
}
#[test]
fn test_timing_calculator_narrow() {
let calc = TimingCalculator::new(
TimingMode::Narrow,
TransmissionMode::Gapped,
FrameRate::FPS_25,
576,
576,
);
let t_readiness = calc.narrow_t_readiness();
let t_drain = calc.narrow_t_drain();
assert!(t_readiness > 0);
assert!(t_drain > 0);
assert!(t_readiness + t_drain <= FrameRate::FPS_25.frame_period_ns());
}
#[test]
fn test_rtp_timestamp_generator() {
let mut gen = RtpTimestampGenerator::new(90000, FrameRate::FPS_25, 0);
let ts1 = gen.current();
gen.advance_frame();
let ts2 = gen.current();
assert_eq!(ts2 - ts1, 3600); }
#[test]
fn test_playout_buffer() {
let mut buffer = PlayoutBuffer::new(1_000_000, 100);
buffer.push(100, "packet1").expect("should succeed in test");
buffer.push(200, "packet2").expect("should succeed in test");
buffer.push(150, "packet3").expect("should succeed in test");
let ready = buffer.pop_ready(125);
assert_eq!(ready.len(), 1);
assert_eq!(ready[0], "packet1");
let ready = buffer.pop_ready(200);
assert_eq!(ready.len(), 2);
}
#[test]
fn test_sync_state() {
let mut state = SyncState::new();
assert!(!state.is_synced());
state.update_ptp(1000, 100_000);
assert!(state.is_synced());
}
#[test]
fn test_frame_sync_point() {
let sync = FrameSyncPoint::new(FrameRate::FPS_25, 0);
let frame_start = sync.frame_start_time(45_000_000);
assert_eq!(frame_start, 40_000_000);
let frame_num = sync.frame_number(45_000_000);
assert_eq!(frame_num, 1);
}
}