lasprs 0.6.7

Library for Acoustic Signal Processing (Rust edition, with optional Python bindings via pyo3)
Documentation
use crate::config::*;
use crate::daq::{InStreamMsg, StreamHandler, StreamMetaData, StreamMgr};
use crate::math::{max, min};
use crate::ps::timebuffer::TimeBuffer;
use anyhow::{anyhow, bail, Result};
use crossbeam::channel::{unbounded, Receiver, Sender};
use parking_lot::Mutex;
use std::sync::Arc;
use std::time::Duration;

type RtViewUpdate = Array2<Flt>;
type SharedRtViewerStatus = Arc<Mutex<Option<Result<RtViewUpdate>>>>;

/// Real time viewer that tracks the signal and the envelope of the signal, in
#[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(Debug)]
pub struct RtViewer {
    /// The computed time step per output (inverse of 'sampling frequency') in \[s\]
    pub T: Flt,
    /// The number of samples in the time buffer
    pub N: usize,

    sender: Sender<RtMsg>,

    status: SharedRtViewerStatus,
}
impl RtViewer {
    /// Create new real time signal viewer. Outputs buffers of the signal envelopOutputs buffers of the signal envelope as a function of time.
    ///
    /// # Args
    ///
    /// - `t_hist` - The time history to show in \[s\].
    /// - `resolution` - The number of samples within this history. This should
    ///   correspond more / less to the amount of pixels used in the viewer.
    ///
    pub fn new(
        smgr: &mut StreamMgr,
        t_hist: Flt,
        resolution: usize,
        channel: usize,
    ) -> Result<RtViewer> {
        if t_hist <= 0. {
            bail!("The time history to show in [s] should be > 0.")
        }
        if resolution <= 1 {
            bail!("Invalid resolution. Please use value > 1");
        }

        let status = Arc::new(Mutex::new(None));

        let (sender, rxmsg) = unbounded();
        Self::startThread(smgr, rxmsg, t_hist, resolution, channel, status.clone());

        Ok(RtViewer {
            T: t_hist,
            N: resolution,
            sender,
            status,
        })
    }
    fn startThread(
        smgr: &mut StreamMgr,
        rxmsg: Receiver<RtMsg>,
        t_hist: Flt,
        N: usize,
        channel: usize,
        status: SharedRtViewerStatus,
    ) {
        let (tx, rxstream) = unbounded();
        smgr.addInQueue(tx);
        rayon::spawn(move || {
            let mut buf = Array2::zeros((N, 2).f());

            let zero_out = |mut arr: ArrayViewMut2<Flt>| {
                arr.mapv_inplace(|_| 0.);
            };

            let mut circbuf = TimeBuffer::new();
            // Placeholder value, should be updated when stream is started.
            let mut nsamples_per_output = 1;
            let T = t_hist / N as Flt;

            'mainloop: loop {
                if let Some(msg) = rxstream.recv_timeout(Duration::from_millis(10)).ok() {
                    match msg {
                        InStreamMsg::StreamStarted(new_meta) => {
                            if channel >= new_meta.nchannels() {
                                // The chosen channel is too high. We stop and
                                // kill here.
                                *status.lock() = Some(Err(anyhow!("Invalid channel. Chosen channel is higher than maximum amount of channels in stream, which is {}", new_meta.nchannels())));
                                return;
                            }

                            // Calculate the number of samples per timestamp of
                            // output
                            let T_ac = 1. / new_meta.samplerate;
                            nsamples_per_output = std::cmp::max((T / T_ac) as usize, 1);
                        }
                        InStreamMsg::StreamStopped | InStreamMsg::StreamError(_) => {
                            circbuf = TimeBuffer::new();
                            zero_out(buf.view_mut());
                            *status.lock() = None;
                        }
                        InStreamMsg::InStreamData(id) => {
                            // Push data on circular buffer
                            let fd = id.getFloatData();
                            let dat = fd.column(channel);
                            circbuf.push(dat.slice(s![.., NewAxis]));
                        }
                    }
                }

                // See if we can update the time buffer
                let mut nnew = circbuf.nsamples() / nsamples_per_output;
                if nnew > 0 {
                    // Rotate back samples, and add new.
                    buf.column_mut(0).as_slice_mut().unwrap().rotate_left(nnew);
                    buf.column_mut(1).as_slice_mut().unwrap().rotate_left(nnew);

                    while let Some(samples) = circbuf.pop(nsamples_per_output, 0) {
                        let max = max(samples.view());
                        let min = min(samples.view());
                        buf[[N - nnew, 0]] = min;
                        buf[[N - nnew, 1]] = max;
                        nnew -= 1;
                    }
                    let new = Ok(buf.clone());
                    *status.lock() = Some(new);
                }

                // Check for messages and act accordingly
                if let Some(msg) = rxmsg.recv_timeout(Duration::from_millis(1)).ok() {
                    match msg {
                        RtMsg::StopThread => {
                            let mut status = status.lock();
                            *status = None;
                            break 'mainloop;
                        }
                    }
                }
            }
        });
    }

    /// Returns an array of time instances corresponding to the output data.
    pub fn getTimeArray(&self) -> Dcol {
        Dcol::linspace(-self.T * ((self.N - 1) as Flt) / self.N as Flt, 0., self.N)
    }

    /// Get the last updated value of the array of mins / max, or an error from
    /// the system.
    pub fn get_last(&self) -> Option<Result<RtViewUpdate>> {
        if let Some(x) = self.status.lock().take() {
            Some(x)
        } else {
            None
        }
    }
}

#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl RtViewer {
    #[new]
    fn new_py(
        smgr: &mut StreamMgr,
        t_hist: Flt,
        resolution: usize,
        channel: usize,
    ) -> Result<RtViewer> {
        RtViewer::new(smgr, t_hist, resolution, channel)
    }

    #[pyo3(name = "getTimeArray")]
    fn getTimeArray_py<'py>(&self, py: Python<'py>) -> Bound<'py, PyArray1<Flt>> {
        self.getTimeArray().to_pyarray_bound(py)
    }
    #[pyo3(name = "get_last")]
    fn get_last_py<'py>(&self, py: Python<'py>) -> PyResult<Option<Bound<'py, PyArray2<Flt>>>> {
        if let Some(res) = self.get_last() {
            let res = res?;
            return Ok(Some(res.to_pyarray_bound(py)));
        }

        Ok(None)
    }
}
impl Drop for RtViewer {
    fn drop(&mut self) {
        self.sender.send(RtMsg::StopThread).unwrap();
    }
}

enum RtMsg {
    StopThread,
}

#[cfg(test)]
mod test {
    use crate::daq::StreamMgr;

    use super::RtViewer;

    #[test]
    fn test_rtviewer1() {
        let mut smgr = StreamMgr::new();
        let t_hist = 10.;
        let resolution = 10;
        let channel = 0;
        let a = RtViewer::new(&mut smgr, t_hist, resolution, channel)
            .unwrap()
            .getTimeArray();
        dbg!(&a);
        assert_eq!(a.last(), Some(&0.));
        assert_eq!(a.first(), Some(&-9.));
    }
}