use std::cell::RefCell;
use std::time::{Instant, SystemTime, UNIX_EPOCH};
thread_local! {
static TIMESTAMP_CACHE: RefCell<TimestampCache> = RefCell::new(TimestampCache::new());
}
#[derive(Debug)]
struct TimestampCache {
cached_timestamp_us: u64,
last_update: Instant,
update_interval_us: u64,
}
impl TimestampCache {
fn new() -> Self {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_micros() as u64;
Self {
cached_timestamp_us: now,
last_update: Instant::now(),
update_interval_us: 1000, }
}
fn get_timestamp(&mut self) -> u64 {
let elapsed_us = self.last_update.elapsed().as_micros() as u64;
if elapsed_us >= self.update_interval_us {
self.cached_timestamp_us = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_micros() as u64;
self.last_update = Instant::now();
self.cached_timestamp_us
} else {
self.cached_timestamp_us + elapsed_us
}
}
}
const RPERF3_UDP_MAGIC: u32 = 0x52504633;
#[derive(Debug, Clone, Copy)]
pub struct UdpPacketHeader {
pub magic: u32,
pub sequence: u64,
pub timestamp_us: u64,
}
impl UdpPacketHeader {
pub const SIZE: usize = 20;
pub fn new(sequence: u64, timestamp_us: u64) -> Self {
Self {
magic: RPERF3_UDP_MAGIC,
sequence,
timestamp_us,
}
}
pub fn with_current_time(sequence: u64) -> Self {
let timestamp_us = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_micros() as u64;
Self::new(sequence, timestamp_us)
}
pub fn to_bytes(&self) -> [u8; Self::SIZE] {
let mut bytes = [0u8; Self::SIZE];
bytes[0..4].copy_from_slice(&self.magic.to_be_bytes());
bytes[4..12].copy_from_slice(&self.sequence.to_be_bytes());
bytes[12..20].copy_from_slice(&self.timestamp_us.to_be_bytes());
bytes
}
pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
if bytes.len() < Self::SIZE {
return None;
}
let magic = u32::from_be_bytes(bytes[0..4].try_into().ok()?);
if magic != RPERF3_UDP_MAGIC {
return None;
}
let sequence = u64::from_be_bytes(bytes[4..12].try_into().ok()?);
let timestamp_us = u64::from_be_bytes(bytes[12..20].try_into().ok()?);
Some(Self {
magic,
sequence,
timestamp_us,
})
}
}
pub fn create_packet(sequence: u64, payload_size: usize) -> Vec<u8> {
let header = UdpPacketHeader::with_current_time(sequence);
let mut packet = Vec::with_capacity(UdpPacketHeader::SIZE + payload_size);
packet.extend_from_slice(&header.to_bytes());
packet.resize(UdpPacketHeader::SIZE + payload_size, 0);
packet
}
pub fn create_packet_fast(sequence: u64, payload_size: usize) -> Vec<u8> {
let timestamp_us = TIMESTAMP_CACHE.with(|cache| cache.borrow_mut().get_timestamp());
let header = UdpPacketHeader::new(sequence, timestamp_us);
let mut packet = Vec::with_capacity(UdpPacketHeader::SIZE + payload_size);
packet.extend_from_slice(&header.to_bytes());
packet.resize(UdpPacketHeader::SIZE + payload_size, 0);
packet
}
pub fn parse_packet(packet: &[u8]) -> Option<(UdpPacketHeader, &[u8])> {
let header = UdpPacketHeader::from_bytes(packet)?;
let payload = &packet[UdpPacketHeader::SIZE..];
Some((header, payload))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_header_serialization() {
let header = UdpPacketHeader::new(42, 1234567890);
let bytes = header.to_bytes();
let parsed = UdpPacketHeader::from_bytes(&bytes).expect("Failed to parse header");
assert_eq!(parsed.magic, RPERF3_UDP_MAGIC);
assert_eq!(parsed.sequence, 42);
assert_eq!(parsed.timestamp_us, 1234567890);
}
#[test]
fn test_invalid_magic() {
let mut bytes = [0u8; UdpPacketHeader::SIZE];
bytes[0..4].copy_from_slice(&0x12345678u32.to_be_bytes());
assert!(UdpPacketHeader::from_bytes(&bytes).is_none());
}
#[test]
fn test_packet_creation() {
let packet = create_packet(100, 1024);
assert_eq!(packet.len(), UdpPacketHeader::SIZE + 1024);
let (header, payload) = parse_packet(&packet).expect("Failed to parse packet");
assert_eq!(header.sequence, 100);
assert_eq!(payload.len(), 1024);
}
#[test]
fn test_short_packet() {
let short_packet = vec![0u8; 10];
assert!(parse_packet(&short_packet).is_none());
}
#[test]
fn test_packet_creation_fast() {
let packet = create_packet_fast(200, 1024);
assert_eq!(packet.len(), UdpPacketHeader::SIZE + 1024);
let (header, payload) = parse_packet(&packet).expect("Failed to parse packet");
assert_eq!(header.sequence, 200);
assert_eq!(payload.len(), 1024);
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_micros() as u64;
let diff = now.saturating_sub(header.timestamp_us);
assert!(diff < 10_000_000, "Timestamp too far in past"); }
#[test]
fn test_timestamp_cache_consistency() {
let packet1 = create_packet_fast(1, 100);
let packet2 = create_packet_fast(2, 100);
let (header1, _) = parse_packet(&packet1).unwrap();
let (header2, _) = parse_packet(&packet2).unwrap();
let diff = header2.timestamp_us.saturating_sub(header1.timestamp_us);
assert!(diff < 10_000, "Timestamps differ by more than 10ms"); }
}