use crate::config::*;
use crate::daq::{InStreamMsg, StreamHandler, StreamMetaData, StreamMgr};
use crate::ps::ApsSettings;
use crate::ps::{AvPowerSpectra, CPSResult};
use crate::I;
use anyhow::Result;
use crossbeam::channel::{unbounded, Receiver, Sender};
use parking_lot::Mutex;
use rayon::ThreadPool;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
type SharedRtApsStatus = Arc<Mutex<Option<RtApsResult>>>;
#[derive(Debug)]
enum RtApsMessage {
StopThread,
ResetStatus,
}
#[derive(Debug)]
pub enum RtApsResult {
NewResult(CPSResult),
NewMeta(Arc<StreamMetaData>),
}
#[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(Debug)]
pub struct RtAps {
status: SharedRtApsStatus,
sender: Sender<RtApsMessage>,
}
impl RtAps {
pub fn new(mgr: &mut StreamMgr, settings: ApsSettings) -> RtAps {
let status = Arc::new(Mutex::new(None));
let (sender, rx) = unbounded();
let aps = AvPowerSpectra::new(settings);
Self::startThread(aps, status.clone(), mgr, rx);
RtAps { status, sender }
}
fn startThread(
mut aps: AvPowerSpectra,
status: SharedRtApsStatus,
smgr: &mut StreamMgr,
rxmsg: Receiver<RtApsMessage>,
) {
let (tx, rxstream) = unbounded();
smgr.addInQueue(tx);
rayon::spawn(move || {
'mainloop: loop {
let mut last_cps: Option<CPSResult> = None;
let mut meta: Option<Arc<StreamMetaData>> = None;
if let Some(msg) = rxstream.recv_timeout(Duration::from_millis(10)).ok() {
match msg {
InStreamMsg::StreamStarted(new_meta) => {
aps.reset();
last_cps = None;
meta = Some(new_meta);
}
InStreamMsg::StreamStopped | InStreamMsg::StreamError(_) => {
debug_assert!(meta.is_none());
last_cps = None;
}
InStreamMsg::InStreamData(id) => {
debug_assert!(meta.is_none());
let flt = id.getFloatData();
if let Some(cpsresult) = aps.compute_last(flt.view()) {
last_cps = Some(cpsresult.clone());
}
}
}
}
if let Some(msg) = rxmsg.recv_timeout(Duration::from_millis(1)).ok() {
match msg {
RtApsMessage::StopThread => {
let mut status = status.lock();
*status = None;
break 'mainloop;
}
RtApsMessage::ResetStatus => {
aps.reset();
}
}
}
'commscope: {
let mut status = status.lock();
if let Some(newmeta) = meta.take() {
*status = Some(RtApsResult::NewMeta(newmeta));
break 'commscope;
}
if let Some(RtApsResult::NewMeta(_)) = status.deref() {
break 'commscope;
}
if let Some(last_cps) = last_cps.take() {
*status = Some(RtApsResult::NewResult(last_cps));
}
} } });
}
pub fn get_last(&self) -> Option<RtApsResult> {
let mut lck = self.status.lock();
lck.take()
}
pub fn reset(&self) {
self.sender.send(RtApsMessage::ResetStatus).unwrap();
}
}
impl Drop for RtAps {
fn drop(&mut self) {
self.sender.send(RtApsMessage::StopThread).unwrap();
}
}
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl RtAps {
#[new]
fn new_py(smgr: &mut StreamMgr, settings: ApsSettings) -> Self {
RtAps::new(smgr, settings)
}
#[pyo3(name = "get_last")]
fn get_last_py<'py>(&self, py: Python<'py>) -> Option<Bound<'py, PyArray3<Cflt>>> {
let res = self.get_last();
if let Some(RtApsResult::NewResult(res)) = res {
return Some(res.to_pyarray_bound(py));
}
None
}
#[pyo3(name = "reset")]
fn reset_py(&self) {
self.reset()
}
}
#[cfg(test)]
mod test {
use std::time::Duration;
use anyhow::{anyhow, bail, Result};
use super::*;
use crate::{daq::StreamMgr, ps::ApsSettingsBuilder};
use std::thread;
#[test]
fn test_rtaps1() -> Result<(), anyhow::Error> {
{
let mut smgr = StreamMgr::new();
smgr.startDefaultInputStream()?;
let meta = smgr
.getStreamMetaData(crate::daq::StreamType::Input)
.ok_or_else(|| anyhow!("Stream is not running"))?;
let settings = ApsSettingsBuilder::default()
.nfft(2048)
.fs(meta.samplerate)
.build()
.unwrap();
let rtaps = RtAps::new(&mut smgr, settings);
thread::sleep(Duration::from_secs(2));
drop(rtaps);
}
Ok(())
}
}