stabilizer_stream/
source.rs

1use crate::{Frame, Loss};
2use anyhow::Result;
3use clap::Parser;
4use rand::{rngs::SmallRng, Rng, SeedableRng};
5use socket2::{Domain, Protocol, Socket, Type};
6use std::{
7    fs::File,
8    io::{BufReader, ErrorKind, Read, Seek},
9    net::{Ipv4Addr, SocketAddr},
10    time::Duration,
11};
12
13/// Stabilizer stream source options
14#[derive(Parser, Debug, Clone)]
15pub struct SourceOpts {
16    /// The local IP to receive streaming data on.
17    #[arg(short, long, default_value = "0.0.0.0")]
18    ip: std::net::Ipv4Addr,
19
20    /// The UDP port to receive streaming data on.
21    #[arg(short, long, default_value_t = 9293)]
22    port: u16,
23
24    /// Use frames from the given file
25    #[arg(short, long)]
26    file: Option<String>,
27
28    /// Frame size in file (8 + n_batches*n_channel*batch_size)
29    #[arg(short, long, default_value_t = 8 + 30 * 2 * 6 * 4)]
30    frame_size: usize,
31
32    /// On a file, wrap around and repeat
33    #[arg(short, long)]
34    repeat: bool,
35
36    /// Single le f32 raw trace, architecture dependent endianess
37    #[arg(short, long)]
38    single: Option<String>,
39
40    /// Power law noise with psd f^noise.
41    #[arg(short, long)]
42    noise: Option<i32>,
43}
44
45#[derive(Debug)]
46enum Data {
47    Udp(Socket),
48    File(BufReader<File>),
49    Single(BufReader<File>),
50    Noise((SmallRng, bool, Vec<f32>)),
51}
52
53pub struct Source {
54    opts: SourceOpts,
55    data: Data,
56    loss: Loss,
57}
58
59impl Source {
60    pub fn new(opts: SourceOpts) -> Result<Self> {
61        let data = if let Some(noise) = opts.noise {
62            Data::Noise((
63                SmallRng::seed_from_u64(0x7654321),
64                noise > 0,
65                vec![0.0; noise.unsigned_abs() as _],
66            ))
67        } else if let Some(file) = &opts.file {
68            Data::File(BufReader::with_capacity(1 << 20, File::open(file)?))
69        } else if let Some(single) = &opts.single {
70            Data::Single(BufReader::with_capacity(1 << 20, File::open(single)?))
71        } else {
72            log::info!("Binding to {}:{}", opts.ip, opts.port);
73            let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
74            socket.set_read_timeout(Some(Duration::from_millis(1000)))?;
75            socket.set_recv_buffer_size(1 << 20)?;
76            socket.set_reuse_address(true)?;
77            if opts.ip.is_multicast() {
78                socket.join_multicast_v4(&opts.ip, &Ipv4Addr::UNSPECIFIED)?;
79            }
80            socket.bind(&SocketAddr::new(opts.ip.into(), opts.port).into())?;
81            Data::Udp(socket)
82        };
83        Ok(Self {
84            opts,
85            data,
86            loss: Loss::default(),
87        })
88    }
89
90    pub fn get(&mut self) -> Result<Vec<Vec<f32>>> {
91        Ok(match &mut self.data {
92            Data::Noise((rng, diff, state)) => {
93                vec![rng
94                    .sample_iter(rand::distributions::Open01)
95                    .map(|mut x| {
96                        x = (x - 0.5) * 12.0f32.sqrt(); // zero mean, RMS = 1
97                        state.iter_mut().fold(x, |mut x, s| {
98                            (x, *s) = if *diff { (x - *s, x) } else { (*s, x + *s) };
99                            x
100                        })
101                    })
102                    .take(4096)
103                    .collect()]
104            }
105            Data::File(fil) => loop {
106                let mut buf = [0u8; 2048];
107                match fil.read_exact(&mut buf[..self.opts.frame_size]) {
108                    Ok(()) => {
109                        let frame = Frame::from_bytes(&buf[..self.opts.frame_size])?;
110                        self.loss.update(&frame);
111                        break frame.data.traces().into();
112                    }
113                    Err(e) if e.kind() == ErrorKind::UnexpectedEof && self.opts.repeat => {
114                        fil.seek(std::io::SeekFrom::Start(0))?;
115                    }
116                    Err(e) => Err(e)?,
117                }
118            },
119            Data::Single(fil) => loop {
120                let mut buf = [0u8; 2048];
121                match fil.read(&mut buf[..]) {
122                    Ok(len) => {
123                        if len == 0 && self.opts.repeat {
124                            fil.seek(std::io::SeekFrom::Start(0))?;
125                            continue;
126                        }
127                        let v: &[[u8; 4]] = bytemuck::cast_slice(&buf[..len / 4 * 4]);
128                        break vec![v.iter().map(|b| f32::from_le_bytes(*b)).collect()];
129                    }
130                    Err(e) => Err(e)?,
131                }
132            },
133            Data::Udp(socket) => {
134                let mut buf = [0u8; 2048];
135                let len = socket.recv(unsafe { core::mem::transmute(&mut buf[..]) })?; // meh
136                let frame = Frame::from_bytes(&buf[..len])?;
137                self.loss.update(&frame);
138                frame.data.traces().into()
139            }
140        })
141    }
142
143    pub fn finish(&self) {
144        self.loss.analyze()
145    }
146}