stabilizer_stream/
source.rs1use crate::{Frame, Loss};
2use anyhow::Result;
3use clap::Parser;
4use rand::{rngs::SmallRng, Rng, SeedableRng};
5use socket2::{Domain, Protocol, Socket, Type};
6use std::{
7 fs::File,
8 io::{BufReader, ErrorKind, Read, Seek},
9 net::{Ipv4Addr, SocketAddr},
10 time::Duration,
11};
12
13#[derive(Parser, Debug, Clone)]
15pub struct SourceOpts {
16 #[arg(short, long, default_value = "0.0.0.0")]
18 ip: std::net::Ipv4Addr,
19
20 #[arg(short, long, default_value_t = 9293)]
22 port: u16,
23
24 #[arg(short, long)]
26 file: Option<String>,
27
28 #[arg(short, long, default_value_t = 8 + 30 * 2 * 6 * 4)]
30 frame_size: usize,
31
32 #[arg(short, long)]
34 repeat: bool,
35
36 #[arg(short, long)]
38 single: Option<String>,
39
40 #[arg(short, long)]
42 noise: Option<i32>,
43}
44
45#[derive(Debug)]
46enum Data {
47 Udp(Socket),
48 File(BufReader<File>),
49 Single(BufReader<File>),
50 Noise((SmallRng, bool, Vec<f32>)),
51}
52
53pub struct Source {
54 opts: SourceOpts,
55 data: Data,
56 loss: Loss,
57}
58
59impl Source {
60 pub fn new(opts: SourceOpts) -> Result<Self> {
61 let data = if let Some(noise) = opts.noise {
62 Data::Noise((
63 SmallRng::seed_from_u64(0x7654321),
64 noise > 0,
65 vec![0.0; noise.unsigned_abs() as _],
66 ))
67 } else if let Some(file) = &opts.file {
68 Data::File(BufReader::with_capacity(1 << 20, File::open(file)?))
69 } else if let Some(single) = &opts.single {
70 Data::Single(BufReader::with_capacity(1 << 20, File::open(single)?))
71 } else {
72 log::info!("Binding to {}:{}", opts.ip, opts.port);
73 let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
74 socket.set_read_timeout(Some(Duration::from_millis(1000)))?;
75 socket.set_recv_buffer_size(1 << 20)?;
76 socket.set_reuse_address(true)?;
77 if opts.ip.is_multicast() {
78 socket.join_multicast_v4(&opts.ip, &Ipv4Addr::UNSPECIFIED)?;
79 }
80 socket.bind(&SocketAddr::new(opts.ip.into(), opts.port).into())?;
81 Data::Udp(socket)
82 };
83 Ok(Self {
84 opts,
85 data,
86 loss: Loss::default(),
87 })
88 }
89
90 pub fn get(&mut self) -> Result<Vec<Vec<f32>>> {
91 Ok(match &mut self.data {
92 Data::Noise((rng, diff, state)) => {
93 vec![rng
94 .sample_iter(rand::distributions::Open01)
95 .map(|mut x| {
96 x = (x - 0.5) * 12.0f32.sqrt(); state.iter_mut().fold(x, |mut x, s| {
98 (x, *s) = if *diff { (x - *s, x) } else { (*s, x + *s) };
99 x
100 })
101 })
102 .take(4096)
103 .collect()]
104 }
105 Data::File(fil) => loop {
106 let mut buf = [0u8; 2048];
107 match fil.read_exact(&mut buf[..self.opts.frame_size]) {
108 Ok(()) => {
109 let frame = Frame::from_bytes(&buf[..self.opts.frame_size])?;
110 self.loss.update(&frame);
111 break frame.data.traces().into();
112 }
113 Err(e) if e.kind() == ErrorKind::UnexpectedEof && self.opts.repeat => {
114 fil.seek(std::io::SeekFrom::Start(0))?;
115 }
116 Err(e) => Err(e)?,
117 }
118 },
119 Data::Single(fil) => loop {
120 let mut buf = [0u8; 2048];
121 match fil.read(&mut buf[..]) {
122 Ok(len) => {
123 if len == 0 && self.opts.repeat {
124 fil.seek(std::io::SeekFrom::Start(0))?;
125 continue;
126 }
127 let v: &[[u8; 4]] = bytemuck::cast_slice(&buf[..len / 4 * 4]);
128 break vec![v.iter().map(|b| f32::from_le_bytes(*b)).collect()];
129 }
130 Err(e) => Err(e)?,
131 }
132 },
133 Data::Udp(socket) => {
134 let mut buf = [0u8; 2048];
135 let len = socket.recv(unsafe { core::mem::transmute(&mut buf[..]) })?; let frame = Frame::from_bytes(&buf[..len])?;
137 self.loss.update(&frame);
138 frame.data.traces().into()
139 }
140 })
141 }
142
143 pub fn finish(&self) {
144 self.loss.analyze()
145 }
146}