use crate::daq::{InStreamMsg, StreamMetaData};
use crate::math::{max, maxabs, min};
use crate::slm::{self, SLMSettingsBuilder, TimeWeighting, SLM};
use crate::{config::*, FreqWeighting, StandardFilterDescriptor};
use crate::{daq::StreamMgr, Dcol};
use crossbeam::channel::{unbounded, Receiver, Sender};
use crossbeam::utils::Backoff;
use parking_lot::Mutex;
use std::default;
use std::ops::DerefMut;
use std::sync::Arc;
use std::time::{Duration, Instant};
pub const ALMOST_CLIPPED_REL_AMP: Flt = 0.98;
const CLIP_INDICATOR_WAIT_S: Duration = Duration::from_secs(2);
const LEVEL_THRESHOLD_FOR_LOW_LEVEL: Flt = -50.;
const LEVEL_THRESHOLD_FOR_HIGH_LEVEL: Flt = -10.;
type SharedPPMStatus = Arc<Mutex<Vec<PPMChannelStatus>>>;
fn level(lin: Flt) -> Flt {
20. * lin.log10()
}
#[derive(Debug)]
#[cfg_attr(feature = "python-bindings", pyclass)]
pub struct PPM {
status: SharedPPMStatus,
sender: Sender<PPMMessage>,
}
impl PPM {
pub fn new(mgr: &mut StreamMgr) -> Self {
let (sender, rxmsg) = unbounded();
let status: SharedPPMStatus = Arc::new(Mutex::new(vec![]));
Self::startThread(status.clone(), mgr, rxmsg);
PPM { status, sender }
}
fn startThread(status: SharedPPMStatus, mgr: &mut StreamMgr, rxmsg: Receiver<PPMMessage>) {
let (tx, rxstream) = unbounded();
mgr.addInQueue(tx);
rayon::spawn(move || {
let mut slms: Vec<SLM> = vec![];
let mut ranges: Vec<(Flt, Flt)> = vec![];
let resetall = |slms: &mut Vec<SLM>| {
let mut status = status.lock();
*status = Default::default();
slms.clear();
};
loop {
if let Some(msg) = rxstream
.recv_timeout(std::time::Duration::from_millis(10))
.ok()
{
match msg {
InStreamMsg::InStreamData(d) => {
let mut status = status.lock();
let floatdata = d.getFloatData();
'channel: for (chno, ((slm, ppmstatus), range)) in slms
.iter_mut()
.zip(status.iter_mut())
.zip(ranges.iter())
.enumerate()
{
let chdata = floatdata.slice(s![.., chno]);
let min_val = min(chdata);
let max_val = max(chdata);
let chdata = chdata
.as_slice()
.expect("Data not contiguous on sample axis");
slm.run(chdata, false);
let last_level = slm.Ltlast()[0];
ppmstatus.level = last_level;
if let Some(moment) = ppmstatus.clip_time {
if moment.elapsed() > CLIP_INDICATOR_WAIT_S {
ppmstatus.clip = ClipState::LevelFine;
ppmstatus.clip_time = None;
}
continue 'channel;
}
let clip = min_val <= ALMOST_CLIPPED_REL_AMP * range.0
|| max_val >= ALMOST_CLIPPED_REL_AMP * range.1;
let abs_range = if range.0.abs() > range.1.abs() {
range.0.abs()
} else {
range.1.abs()
};
let high_level_threshold =
level(abs_range) + LEVEL_THRESHOLD_FOR_HIGH_LEVEL;
let low_level_threshold =
level(abs_range) + LEVEL_THRESHOLD_FOR_LOW_LEVEL;
let high_level = last_level > high_level_threshold;
let low_level = last_level < low_level_threshold;
ppmstatus.clip = if clip {
ppmstatus.clip_time = Some(Instant::now());
ClipState::Clipped
} else if high_level {
ClipState::HighLevel
} else if low_level {
ClipState::LowLevel
} else {
ClipState::LevelFine
}
}
}
InStreamMsg::StreamError(_e) => {
resetall(&mut slms);
}
InStreamMsg::StreamStarted(meta) => {
slms.clear();
let mut s = status.lock();
(0..meta.nchannels()).for_each(|_ch_index| {
let slmsettings = SLMSettingsBuilder::default()
.fs(meta.samplerate)
.freqWeighting(FreqWeighting::Z)
.Lref(1.0)
.timeWeighting(TimeWeighting::Impulse {})
.filterDescriptors([
StandardFilterDescriptor::Overall().unwrap()
])
.build()
.unwrap();
slms.push(SLM::new(slmsettings.clone()));
s.push(PPMChannelStatus {
clip: ClipState::LowLevel,
level: -300.,
clip_time: None,
});
});
ranges = meta.channelInfo.iter().map(|ch| ch.range).collect();
}
InStreamMsg::StreamStopped => {}
}
for msg in rxmsg.try_iter() {
match msg {
PPMMessage::ResetClip => {
let mut s = status.lock();
s.iter_mut().for_each(|c| c.clip = ClipState::LevelFine);
}
PPMMessage::StopThread => {
resetall(&mut slms);
return;
}
}
}
}
}
});
}
pub fn resetClip(&self) {
self.sender.send(PPMMessage::ResetClip).unwrap();
}
pub fn getState(&self) -> (Dcol, Vec<ClipState>) {
let status = self.status.lock();
let levels = Dcol::from_iter(status.iter().map(|s| s.level));
let clips = status.iter().map(|s| s.clip).collect();
(levels, clips)
}
}
impl Drop for PPM {
fn drop(&mut self) {
self.sender.send(PPMMessage::StopThread).unwrap();
}
}
#[cfg_attr(feature = "python-bindings", pyclass(eq, eq_int))]
#[derive(Copy, Debug, PartialEq, Clone, Default)]
pub enum ClipState {
#[default]
LowLevel,
LevelFine,
HighLevel,
Clipped,
}
#[derive(Debug, Default)]
struct PPMChannelStatus {
level: Flt,
clip: ClipState,
clip_time: Option<Instant>,
}
enum PPMMessage {
ResetClip,
StopThread,
}
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl PPM {
#[new]
fn new_py(smgr: &mut StreamMgr) -> Self {
Self::new(smgr)
}
#[pyo3(name = "getState")]
fn getState_py<'py>(&self, py: Python<'py>) -> (Bound<'py, PyArray1<Flt>>, Vec<ClipState>) {
let (levels, clips) = self.getState();
let levels = levels.to_pyarray_bound(py);
(levels, clips)
}
#[pyo3(name = "resetClip")]
fn resetClip_py(&self) {
self.resetClip()
}
}