use crate::{Frame, Loss};
use anyhow::Result;
use clap::Parser;
use rand::{rngs::SmallRng, Rng, SeedableRng};
use socket2::{Domain, Protocol, Socket, Type};
use std::{
fs::File,
io::{BufReader, ErrorKind, Read, Seek},
net::{Ipv4Addr, SocketAddr},
time::Duration,
};
#[derive(Parser, Debug, Clone)]
pub struct SourceOpts {
#[arg(short, long, default_value = "0.0.0.0")]
ip: std::net::Ipv4Addr,
#[arg(short, long, default_value_t = 9293)]
port: u16,
#[arg(short, long)]
file: Option<String>,
#[arg(short, long, default_value_t = 8 + 30 * 2 * 6 * 4)]
frame_size: usize,
#[arg(short, long)]
repeat: bool,
#[arg(short, long)]
single: Option<String>,
#[arg(short, long)]
noise: Option<i32>,
}
#[derive(Debug)]
enum Data {
Udp(Socket),
File(BufReader<File>),
Single(BufReader<File>),
Noise((SmallRng, Vec<f64>)),
}
pub struct Source {
opts: SourceOpts,
data: Data,
loss: Loss,
}
impl Source {
pub fn new(opts: SourceOpts) -> Result<Self> {
let data = if let Some(noise) = opts.noise {
Data::Noise((
SmallRng::seed_from_u64(0x7654321),
vec![0.0; noise.unsigned_abs() as _],
))
} else if let Some(file) = &opts.file {
Data::File(BufReader::with_capacity(1 << 20, File::open(file)?))
} else if let Some(single) = &opts.single {
Data::Single(BufReader::with_capacity(1 << 20, File::open(single)?))
} else {
log::info!("Binding to {}:{}", opts.ip, opts.port);
let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
socket.set_read_timeout(Some(Duration::from_millis(1000)))?;
socket.set_recv_buffer_size(1 << 20)?;
socket.set_reuse_address(true)?;
if opts.ip.is_multicast() {
socket.join_multicast_v4(&opts.ip, &Ipv4Addr::UNSPECIFIED)?;
}
socket.bind(&SocketAddr::new(opts.ip.into(), opts.port).into())?;
Data::Udp(socket)
};
Ok(Self {
opts,
data,
loss: Loss::default(),
})
}
pub fn get(&mut self) -> Result<Vec<Vec<f32>>> {
Ok(match &mut self.data {
Data::Noise((rng, state)) => {
vec![(0..1024)
.zip(rng.sample_iter(rand::distributions::Open01))
.map(|(_, mut x)| {
x = (x + 0.5) * 6.0f64.sqrt();
let diff = self.opts.noise.unwrap() > 0;
for s in state.iter_mut() {
(x, *s) = if diff { (x - *s, x) } else { (*s, x + *s) };
}
x as _
})
.collect()]
}
Data::File(fil) => loop {
let mut buf = [0u8; 2048];
match fil.read_exact(&mut buf[..self.opts.frame_size]) {
Ok(()) => {
let frame = Frame::from_bytes(&buf[..self.opts.frame_size])?;
self.loss.update(&frame);
break frame.data.traces().into();
}
Err(e) if e.kind() == ErrorKind::UnexpectedEof && self.opts.repeat => {
fil.seek(std::io::SeekFrom::Start(0))?;
}
Err(e) => Err(e)?,
}
},
Data::Single(fil) => loop {
let mut buf = [0u8; 2048];
match fil.read(&mut buf[..]) {
Ok(len) => {
if len == 0 && self.opts.repeat {
fil.seek(std::io::SeekFrom::Start(0))?;
continue;
}
let v: &[[u8; 4]] = bytemuck::cast_slice(&buf[..len / 4 * 4]);
break vec![v.iter().map(|b| f32::from_le_bytes(*b)).collect()];
}
Err(e) => Err(e)?,
}
},
Data::Udp(socket) => {
let mut buf = [0u8; 2048];
let len = socket.recv(unsafe { core::mem::transmute(&mut buf[..]) })?; let frame = Frame::from_bytes(&buf[..len])?;
self.loss.update(&frame);
frame.data.traces().into()
}
})
}
pub fn finish(&self) {
self.loss.analyze()
}
}