lasprs 0.6.7

Library for Acoustic Signal Processing (Rust edition, with optional Python bindings via pyo3)
Documentation
use crate::config::*;
use crate::daq::{InStreamMsg, StreamHandler, StreamMetaData, StreamMgr};
use crate::ps::ApsSettings;
use crate::ps::{AvPowerSpectra, CPSResult};
use crate::I;
use anyhow::Result;
use crossbeam::channel::{unbounded, Receiver, Sender};
use parking_lot::Mutex;
use rayon::ThreadPool;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;

type SharedRtApsStatus = Arc<Mutex<Option<RtApsResult>>>;

#[derive(Debug)]
enum RtApsMessage {
    StopThread,
    ResetStatus,
}
/// Result type coming from Real time Averaged Power Spectra computation engine
#[derive(Debug)]
pub enum RtApsResult {
    /// New result
    NewResult(CPSResult),
    /// New metadata
    NewMeta(Arc<StreamMetaData>),
}

/// Real time power spectra viewer. Shows cross-power or auto-power signal 'time-dependent'
#[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(Debug)]
pub struct RtAps {
    /// Storage for optional last result
    status: SharedRtApsStatus,

    // For sending messages to the data processing thread
    sender: Sender<RtApsMessage>,
}

impl RtAps {
    /// Create new Real time power spectra computing engine.
    pub fn new(mgr: &mut StreamMgr, settings: ApsSettings) -> RtAps {
        // Handler needs to be created here.
        let status = Arc::new(Mutex::new(None));

        let (sender, rx) = unbounded();
        let aps = AvPowerSpectra::new(settings);
        Self::startThread(aps, status.clone(), mgr, rx);

        RtAps { status, sender }
    }

    fn startThread(
        mut aps: AvPowerSpectra,
        status: SharedRtApsStatus,
        smgr: &mut StreamMgr,
        rxmsg: Receiver<RtApsMessage>,
    ) {
        // Obtain messages from stream manager
        let (tx, rxstream) = unbounded();

        // Add queue sender part of queue to stream manager
        smgr.addInQueue(tx);

        rayon::spawn(move || {
            // What is running on the thread
            'mainloop: loop {
                let mut last_cps: Option<CPSResult> = None;
                let mut meta: Option<Arc<StreamMetaData>> = None;

                if let Some(msg) = rxstream.recv_timeout(Duration::from_millis(10)).ok() {
                    match msg {
                        InStreamMsg::StreamStarted(new_meta) => {
                            aps.reset();
                            last_cps = None;
                            meta = Some(new_meta);
                        }
                        InStreamMsg::StreamStopped | InStreamMsg::StreamError(_) => {
                            debug_assert!(meta.is_none());
                            last_cps = None;
                        }
                        InStreamMsg::InStreamData(id) => {
                            debug_assert!(meta.is_none());
                            let flt = id.getFloatData();
                            if let Some(cpsresult) = aps.compute_last(flt.view()) {
                                last_cps = Some(cpsresult.clone());
                            }
                        }
                    }
                }

                // Check for messages and act accordingly
                if let Some(msg) = rxmsg.recv_timeout(Duration::from_millis(1)).ok() {
                    match msg {
                        RtApsMessage::StopThread => {
                            let mut status = status.lock();
                            *status = None;
                            break 'mainloop;
                        }
                        RtApsMessage::ResetStatus => {
                            aps.reset();
                        }
                    }
                }

                // Communicate last result, if any.
                'commscope: {
                    let mut status = status.lock();

                    if let Some(newmeta) = meta.take() {
                        // New metadata has arrived. This is always the first
                        // thing to push. Only when it is read, we will start
                        // pushing actual data.
                        *status = Some(RtApsResult::NewMeta(newmeta));
                        break 'commscope;
                    }

                    if let Some(RtApsResult::NewMeta(_)) = status.deref() {
                        // New metadata is not yet read by reading thread. It
                        // basically means we are not yet ready to give actual
                        // data back.
                        break 'commscope;
                    }
                    // Move last_cps into mutex.
                    if let Some(last_cps) = last_cps.take() {
                        *status = Some(RtApsResult::NewResult(last_cps));
                    }
                } // end of commscope
            } // End of mainloop
        });
    }

    /// Take last updated result.
    pub fn get_last(&self) -> Option<RtApsResult> {
        let mut lck = self.status.lock();
        lck.take()
    }

    /// Reset power spectra estimator, start with a clean sleeve
    pub fn reset(&self) {
        self.sender.send(RtApsMessage::ResetStatus).unwrap();
    }
}
impl Drop for RtAps {
    fn drop(&mut self) {
        self.sender.send(RtApsMessage::StopThread).unwrap();
    }
}
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl RtAps {
    #[new]
    fn new_py(smgr: &mut StreamMgr, settings: ApsSettings) -> Self {
        RtAps::new(smgr, settings)
    }
    // This method does not forward the metadata. Should come from somewhere else
    #[pyo3(name = "get_last")]
    fn get_last_py<'py>(&self, py: Python<'py>) -> Option<Bound<'py, PyArray3<Cflt>>> {
        let res = self.get_last();
        if let Some(RtApsResult::NewResult(res)) = res {
            return Some(res.to_pyarray_bound(py));
        }
        None
    }

    #[pyo3(name = "reset")]
    fn reset_py(&self) {
        self.reset()
    }
}

#[cfg(test)]
mod test {
    use std::time::Duration;

    use anyhow::{anyhow, bail, Result};

    use super::*;
    use crate::{daq::StreamMgr, ps::ApsSettingsBuilder};
    use std::thread;
    #[test]
    fn test_rtaps1() -> Result<(), anyhow::Error> {
        {
            let mut smgr = StreamMgr::new();
            smgr.startDefaultInputStream()?;
            let meta = smgr
                .getStreamMetaData(crate::daq::StreamType::Input)
                .ok_or_else(|| anyhow!("Stream is not running"))?;

            let settings = ApsSettingsBuilder::default()
                .nfft(2048)
                .fs(meta.samplerate)
                .build()
                .unwrap();
            let rtaps = RtAps::new(&mut smgr, settings);
            thread::sleep(Duration::from_secs(2));
            drop(rtaps);
        }
        Ok(())
    }
}