use super::{ppmdropspeed::FromToUsize, PPMDropSpeed};
pub use super::{ClipState, PPMChannelStatus};
use crate::daq::{InStreamMsg, StreamMetaData};
use crate::{config::*, FreqWeighting, StandardFilterDescriptor};
use crate::{daq::StreamMgr, Dcol};
use crossbeam::channel::{unbounded, Receiver, Sender};
use parking_lot::Mutex;
use rayon::iter::{
IndexedParallelIterator, IntoParallelRefIterator, IntoParallelRefMutIterator, ParallelIterator,
};
use smallvec::{smallvec, SmallVec};
use std::default;
use std::ops::DerefMut;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
type SharedPPMStatus = Arc<Mutex<SmallVec<[PPMChannelStatus; 24]>>>;
#[derive(Debug)]
pub struct PPM {
status: SharedPPMStatus,
stopThread: Arc<AtomicBool>,
updateDropSpeed: Arc<AtomicUsize>,
}
impl PPM {
pub fn newInput(mgr: &mut StreamMgr, dropSpeed: PPMDropSpeed) -> Arc<Self> {
if let Some(ppm) = mgr.getPPMInput() {
ppm.updateDropSpeed(dropSpeed);
return ppm;
}
let (tx, rxstream) = unbounded();
mgr.addInQueue(tx);
let me = Self::startThread(rxstream, dropSpeed);
let me = Arc::new(me);
mgr.setPPMInput(&me);
me
}
pub fn newMonitor(mgr: &mut StreamMgr, dropSpeed: PPMDropSpeed) -> Arc<Self> {
if let Some(ppm) = mgr.getPPMMon() {
ppm.updateDropSpeed(dropSpeed);
return ppm;
}
let (tx, rxstream) = unbounded();
mgr.addMonitorQueue(tx);
let me = Self::startThread(rxstream, dropSpeed);
let me = Arc::new(me);
mgr.setPPMMon(&me);
me
}
fn startThread(rxstream: Receiver<InStreamMsg>, initDropSpeed: PPMDropSpeed) -> Self {
let stopThread = Arc::new(AtomicBool::new(false));
let updateDropSpeed = Arc::new(AtomicUsize::new(None.to_usize()));
let status = Arc::new(Mutex::new(smallvec![]));
let me = PPM {
status: status.clone(),
stopThread: stopThread.clone(),
updateDropSpeed: updateDropSpeed.clone(),
};
rayon::spawn(move || {
let mut dropSpeed = initDropSpeed;
let mut meta = None;
let reintialize = |dropSpeed, meta: Option<&Arc<StreamMetaData>>| {
if let Some(meta) = meta {
let mut s = status.lock();
s.clear();
(0..meta.nchannels()).for_each(|ch_index| {
s.push(PPMChannelStatus::new(meta, ch_index, dropSpeed));
});
} else {
status.lock().clear();
}
};
'ppmloop: loop {
if stopThread.load(Ordering::Relaxed) {
break 'ppmloop;
}
if let Some(newDropspeed) =
FromToUsize::from_usize(updateDropSpeed.swap(0, Ordering::Relaxed))
{
dropSpeed = newDropspeed;
reintialize(dropSpeed, meta.as_ref());
}
if let Ok(msg) = rxstream
.recv_timeout(std::time::Duration::from_millis(10))
{
match msg {
InStreamMsg::InStreamData(d) => {
let mut status = status.lock();
let floatdata = d.getFloatData();
status.par_iter_mut().enumerate().for_each(|(i, ch)| {
ch.process(floatdata.column(i));
});
}
InStreamMsg::StreamStarted(newmeta) => {
meta = Some(newmeta);
reintialize(dropSpeed, meta.as_ref());
}
InStreamMsg::StreamError(_e) => {
meta = None;
reintialize(dropSpeed, meta.as_ref());
}
InStreamMsg::StreamStopped => {
meta = None;
reintialize(dropSpeed, meta.as_ref());
}
}
}
}
});
me
}
pub fn getState(&self) -> (Dcol, Dcol, Vec<ClipState>) {
let status = self.status.lock();
let levels = Dcol::from_iter(status.iter().map(|s| s.signal_level));
let peak_levels = Dcol::from_iter(status.iter().map(|s| s.peak_level));
let clips = status.iter().map(|s| s.clipstate).collect();
(levels, peak_levels, clips)
}
pub fn updateDropSpeed(&self, newDropSpeed: PPMDropSpeed) {
self.updateDropSpeed
.store(Some(newDropSpeed).to_usize(), Ordering::Relaxed);
}
}
impl Drop for PPM {
fn drop(&mut self) {
self.stopThread.store(true, Ordering::Relaxed);
}
}