use crate::daq::InStreamMsg;
use crate::daq::StreamMetaData;
use crate::daq::StreamMgr;
use crate::math::max;
use crate::math::maxabs;
use crate::math::min;
use crate::Flt;
use crossbeam::channel::unbounded;
use parking_lot::Mutex;
use std::sync::atomic::Ordering::Relaxed;
use std::{
sync::{atomic::AtomicBool, Arc},
time::Duration,
};
use super::ppm::ALMOST_CLIPPED_REL_AMP;
pub struct SimpleClipDetector {
clipped: Arc<AtomicBool>,
stopThread: Arc<AtomicBool>,
}
impl SimpleClipDetector {
pub fn new(smgr: &mut StreamMgr) -> Self {
let (tx, rx) = unbounded();
let clipstate = Arc::new(AtomicBool::new(false));
let stopThread = Arc::new(AtomicBool::new(false));
let clipstate2 = clipstate.clone();
let stopThread2 = stopThread.clone();
smgr.addInQueue(tx);
rayon::spawn(
move || {
let mut mins = vec![];
let mut maxs = vec![];
let mut ranges = vec![];
loop {
if let Ok(msg) = rx.recv_timeout(Duration::from_millis(1500)) {
match msg {
InStreamMsg::StreamStarted(meta) => {
mins.resize(meta.nchannels(), 0.);
maxs.resize(meta.nchannels(), 0.);
ranges = meta.channelInfo.iter().map(|ch| ch.range).collect();
}
InStreamMsg::InStreamData(dat) => {
let flt = dat.getFloatData();
flt.columns()
.into_iter()
.zip(maxs.iter_mut())
.for_each(|(coli, maxi)| *maxi = max(coli));
flt.columns()
.into_iter()
.zip(mins.iter_mut())
.for_each(|(coli, mini)| *mini = min(coli));
maxs.iter().zip(mins.iter()).zip(ranges.iter()).for_each(
|((max, min), range)| {
let min_for_clip = ALMOST_CLIPPED_REL_AMP * range.0;
let max_for_clip = ALMOST_CLIPPED_REL_AMP * range.1;
if *max >= max_for_clip {
clipstate.store(true, Relaxed);
return;
}
if *min <= min_for_clip {
clipstate.store(true, Relaxed);
return;
}
},
);
}
_ => {}
} }; if stopThread.load(Relaxed) {
return;
}
} }, );
Self {
clipped: clipstate2,
stopThread: stopThread2,
}
}
pub fn hasClipped(&self) -> bool {
return self.clipped.load(Relaxed);
}
}
impl Drop for SimpleClipDetector {
fn drop(&mut self) {
self.stopThread.store(true, Relaxed);
}
}