use crate::config::*;
use crate::daq::{InStreamMsg, StreamHandler, StreamMetaData, StreamMgr};
use crate::math::{max, min};
use crate::ps::timebuffer::TimeBuffer;
use anyhow::{anyhow, bail, Result};
use crossbeam::channel::{unbounded, Receiver, Sender};
use parking_lot::Mutex;
use std::sync::Arc;
use std::time::Duration;
type RtViewUpdate = Array2<Flt>;
type SharedRtViewerStatus = Arc<Mutex<Option<Result<RtViewUpdate>>>>;
#[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(Debug)]
pub struct RtViewer {
pub T: Flt,
pub N: usize,
sender: Sender<RtMsg>,
status: SharedRtViewerStatus,
}
impl RtViewer {
pub fn new(
smgr: &mut StreamMgr,
t_hist: Flt,
resolution: usize,
channel: usize,
) -> Result<RtViewer> {
if t_hist <= 0. {
bail!("The time history to show in [s] should be > 0.")
}
if resolution <= 1 {
bail!("Invalid resolution. Please use value > 1");
}
let status = Arc::new(Mutex::new(None));
let (sender, rxmsg) = unbounded();
Self::startThread(smgr, rxmsg, t_hist, resolution, channel, status.clone());
Ok(RtViewer {
T: t_hist,
N: resolution,
sender,
status,
})
}
fn startThread(
smgr: &mut StreamMgr,
rxmsg: Receiver<RtMsg>,
t_hist: Flt,
N: usize,
channel: usize,
status: SharedRtViewerStatus,
) {
let (tx, rxstream) = unbounded();
smgr.addInQueue(tx);
rayon::spawn(move || {
let mut buf = Array2::zeros((N, 2).f());
let zero_out = |mut arr: ArrayViewMut2<Flt>| {
arr.mapv_inplace(|_| 0.);
};
let mut circbuf = TimeBuffer::new();
let mut nsamples_per_output = 1;
let T = t_hist / N as Flt;
'mainloop: loop {
if let Some(msg) = rxstream.recv_timeout(Duration::from_millis(10)).ok() {
match msg {
InStreamMsg::StreamStarted(new_meta) => {
if channel >= new_meta.nchannels() {
*status.lock() = Some(Err(anyhow!("Invalid channel. Chosen channel is higher than maximum amount of channels in stream, which is {}", new_meta.nchannels())));
return;
}
let T_ac = 1. / new_meta.samplerate;
nsamples_per_output = std::cmp::max((T / T_ac) as usize, 1);
}
InStreamMsg::StreamStopped | InStreamMsg::StreamError(_) => {
circbuf = TimeBuffer::new();
zero_out(buf.view_mut());
*status.lock() = None;
}
InStreamMsg::InStreamData(id) => {
let fd = id.getFloatData();
let dat = fd.column(channel);
circbuf.push(dat.slice(s![.., NewAxis]));
}
}
}
let mut nnew = circbuf.nsamples() / nsamples_per_output;
if nnew > 0 {
buf.column_mut(0).as_slice_mut().unwrap().rotate_left(nnew);
buf.column_mut(1).as_slice_mut().unwrap().rotate_left(nnew);
while let Some(samples) = circbuf.pop(nsamples_per_output, 0) {
let max = max(samples.view());
let min = min(samples.view());
buf[[N - nnew, 0]] = min;
buf[[N - nnew, 1]] = max;
nnew -= 1;
}
let new = Ok(buf.clone());
*status.lock() = Some(new);
}
if let Some(msg) = rxmsg.recv_timeout(Duration::from_millis(1)).ok() {
match msg {
RtMsg::StopThread => {
let mut status = status.lock();
*status = None;
break 'mainloop;
}
}
}
}
});
}
pub fn getTimeArray(&self) -> Dcol {
Dcol::linspace(-self.T * ((self.N - 1) as Flt) / self.N as Flt, 0., self.N)
}
pub fn get_last(&self) -> Option<Result<RtViewUpdate>> {
if let Some(x) = self.status.lock().take() {
Some(x)
} else {
None
}
}
}
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl RtViewer {
#[new]
fn new_py(
smgr: &mut StreamMgr,
t_hist: Flt,
resolution: usize,
channel: usize,
) -> Result<RtViewer> {
RtViewer::new(smgr, t_hist, resolution, channel)
}
#[pyo3(name = "getTimeArray")]
fn getTimeArray_py<'py>(&self, py: Python<'py>) -> Bound<'py, PyArray1<Flt>> {
self.getTimeArray().to_pyarray_bound(py)
}
#[pyo3(name = "get_last")]
fn get_last_py<'py>(&self, py: Python<'py>) -> PyResult<Option<Bound<'py, PyArray2<Flt>>>> {
if let Some(res) = self.get_last() {
let res = res?;
return Ok(Some(res.to_pyarray_bound(py)));
}
Ok(None)
}
}
impl Drop for RtViewer {
fn drop(&mut self) {
self.sender.send(RtMsg::StopThread).unwrap();
}
}
enum RtMsg {
StopThread,
}
#[cfg(test)]
mod test {
use crate::daq::StreamMgr;
use super::RtViewer;
#[test]
fn test_rtviewer1() {
let mut smgr = StreamMgr::new();
let t_hist = 10.;
let resolution = 10;
let channel = 0;
let a = RtViewer::new(&mut smgr, t_hist, resolution, channel)
.unwrap()
.getTimeArray();
dbg!(&a);
assert_eq!(a.last(), Some(&0.));
assert_eq!(a.first(), Some(&-9.));
}
}