lasprs 0.8.0

Library for Acoustic Signal Processing (Rust edition, with optional Python bindings via pyo3)
Documentation
use super::{ppmdropspeed::FromToUsize, PPMDropSpeed};
pub use super::{ClipState, PPMChannelStatus};
use crate::daq::{InStreamMsg, StreamMetaData};
use crate::{config::*, FreqWeighting, StandardFilterDescriptor};
use crate::{daq::StreamMgr, Dcol};
use crossbeam::channel::{unbounded, Receiver, Sender};
use parking_lot::Mutex;
use rayon::iter::{
    IndexedParallelIterator, IntoParallelRefIterator, IntoParallelRefMutIterator, ParallelIterator,
};
use smallvec::{smallvec, SmallVec};
use std::default;
use std::ops::DerefMut;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::{
    atomic::{AtomicBool, Ordering},
    Arc,
};

type SharedPPMStatus = Arc<Mutex<SmallVec<[PPMChannelStatus; 24]>>>;

/// Peak programme meter implementation, including clip detector. Effectively uses a realtime SLM on all
/// input channels. Also includes a clipping detector.
#[derive(Debug)]
pub struct PPM {
    // Latest and createst status
    status: SharedPPMStatus,
    stopThread: Arc<AtomicBool>,
    updateDropSpeed: Arc<AtomicUsize>,
}
impl PPM {
    /// Initialize a new PPM meter for data from an input stream
    ///
    /// Args
    ///
    /// - `mgr`: Stream manager instance.
    /// - `dropSpeed`: Speed with which the PPM output drops when all future
    ///   signal data equals 0.
    pub fn newInput(mgr: &mut StreamMgr, dropSpeed: PPMDropSpeed) -> Arc<Self> {
        if let Some(ppm) = mgr.getPPMInput() {
            ppm.updateDropSpeed(dropSpeed);
            return ppm;
        }

        // Obtain messages from stream manager
        let (tx, rxstream) = unbounded();

        mgr.addInQueue(tx);

        let me = Self::startThread(rxstream, dropSpeed);
        let me = Arc::new(me);

        mgr.setPPMInput(&me);
        me
    }
    /// Initialize a new PPM meter for data from an input stream
    ///
    /// Args
    ///
    /// - `mgr`: Stream manager instance.
    /// - `dropSpeed`: Speed with which the PPM output drops when all future
    ///   signal data equals 0.
    pub fn newMonitor(mgr: &mut StreamMgr, dropSpeed: PPMDropSpeed) -> Arc<Self> {
        if let Some(ppm) = mgr.getPPMMon() {
            ppm.updateDropSpeed(dropSpeed);
            return ppm;
        }
        // Obtain messages from stream manager
        let (tx, rxstream) = unbounded();

        mgr.addMonitorQueue(tx);

        let me = Self::startThread(rxstream, dropSpeed);
        let me = Arc::new(me);
        mgr.setPPMMon(&me);
        me
    }
    fn startThread(rxstream: Receiver<InStreamMsg>, initDropSpeed: PPMDropSpeed) -> Self {
        let stopThread = Arc::new(AtomicBool::new(false));
        let updateDropSpeed = Arc::new(AtomicUsize::new(None.to_usize()));
        let status = Arc::new(Mutex::new(smallvec![]));
        let me = PPM {
            status: status.clone(),
            stopThread: stopThread.clone(),
            updateDropSpeed: updateDropSpeed.clone(),
        };

        rayon::spawn(move || {
            let mut dropSpeed = initDropSpeed;
            let mut meta = None;

            let reintialize = |dropSpeed, meta: Option<&Arc<StreamMetaData>>| {
                if let Some(meta) = meta {
                    let mut s = status.lock();
                    s.clear();
                    (0..meta.nchannels()).for_each(|ch_index| {
                        s.push(PPMChannelStatus::new(meta, ch_index, dropSpeed));
                    });
                } else {
                    status.lock().clear();
                }
            };

            'ppmloop: loop {
                if stopThread.load(Ordering::Relaxed) {
                    break 'ppmloop;
                }
                if let Some(newDropspeed) =
                    FromToUsize::from_usize(updateDropSpeed.swap(0, Ordering::Relaxed))
                {
                    dropSpeed = newDropspeed;
                    reintialize(dropSpeed, meta.as_ref());
                }
                if let Ok(msg) = rxstream
                    .recv_timeout(std::time::Duration::from_millis(10))
                {
                    match msg {
                        InStreamMsg::InStreamData(d) => {
                            let mut status = status.lock();
                            let floatdata = d.getFloatData();

                            status.par_iter_mut().enumerate().for_each(|(i, ch)| {
                                ch.process(floatdata.column(i));
                            });
                        }
                        InStreamMsg::StreamStarted(newmeta) => {
                            meta = Some(newmeta);
                            reintialize(dropSpeed, meta.as_ref());
                        }
                        InStreamMsg::StreamError(_e) => {
                            meta = None;
                            reintialize(dropSpeed, meta.as_ref());
                        }
                        InStreamMsg::StreamStopped => {
                            meta = None;
                            reintialize(dropSpeed, meta.as_ref());
                        }
                    }
                }
            }
        });
        me
    }

    /// Returns the current state: levels, peak levels and clip state
    pub fn getState(&self) -> (Dcol, Dcol, Vec<ClipState>) {
        let status = self.status.lock();
        let levels = Dcol::from_iter(status.iter().map(|s| s.signal_level));
        let peak_levels = Dcol::from_iter(status.iter().map(|s| s.peak_level));
        let clips = status.iter().map(|s| s.clipstate).collect();
        (levels, peak_levels, clips)
    }

    /// Update the level drop Speed to a new value
    pub fn updateDropSpeed(&self, newDropSpeed: PPMDropSpeed) {
        self.updateDropSpeed
            .store(Some(newDropSpeed).to_usize(), Ordering::Relaxed);
    }
}
impl Drop for PPM {
    fn drop(&mut self) {
        // Stop the thread
        self.stopThread.store(true, Ordering::Relaxed);
    }
}