use super::config::*;
use super::*;
use crate::{
config::*,
siggen::{self, Siggen},
};
use anyhow::{bail, Error, Result};
use api::StreamApiDescr;
use array_init::from_iter;
use core::time;
use cpal::Sample;
use crossbeam::{
channel::{unbounded, Receiver, Sender, TrySendError},
thread,
};
use std::sync::{atomic::AtomicBool, Arc, Mutex};
use std::thread::{JoinHandle, Thread};
use streamcmd::StreamCommand;
use streamdata::*;
use streammsg::*;
#[cfg(feature = "cpal-api")]
use super::api::{api_cpal::CpalApi, Stream};
pub type SharedInQueue = Sender<InStreamMsg>;
pub type InQueues = Vec<SharedInQueue>;
struct StreamInfo<T> {
streamtype: StreamType,
stream: Box<dyn Stream>,
threadhandle: JoinHandle<T>,
comm: Sender<StreamCommand>,
}
static smgr_created: AtomicBool = AtomicBool::new(false);
#[cfg_attr(feature = "python-bindings", pyclass(unsendable))]
pub struct StreamMgr {
devs: Vec<DeviceInfo>,
input_stream: Option<StreamInfo<InQueues>>,
output_stream: Option<StreamInfo<Siggen>>,
#[cfg(feature = "cpal-api")]
cpal_api: CpalApi,
instreamqueues: Option<InQueues>,
siggen: Option<crate::siggen::Siggen>,
}
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl StreamMgr {
#[new]
fn new_py<'py>() -> StreamMgr {
StreamMgr::new()
}
#[pyo3(name = "startDefaultInputStream")]
fn startDefaultInputStream_py(&mut self) -> PyResult<()> {
Ok(self.startDefaultInputStream()?)
}
#[pyo3(name = "startDefaultOutputStream")]
fn startDefaultOutputStream_py(&mut self) -> PyResult<()> {
Ok(self.startDefaultOutputStream()?)
}
#[pyo3(name = "startStream")]
fn startStream_py(&mut self, st: StreamType, d: &DaqConfig) -> PyResult<()> {
Ok(self.startStream(st, d)?)
}
#[pyo3(name = "stopStream")]
fn stopStream_py(&mut self, st: StreamType) -> PyResult<()> {
Ok(self.stopStream(st)?)
}
#[pyo3(name = "getDeviceInfo")]
fn getDeviceInfo_py(&mut self) -> PyResult<Vec<DeviceInfo>> {
Ok(self.getDeviceInfo())
}
#[pyo3(name = "getStatus")]
fn getStatus_py(&self, st: StreamType) -> StreamStatus {
self.getStatus(st)
}
#[pyo3(name = "setSiggen")]
fn setSiggen_py(&mut self, siggen: Siggen) {
self.setSiggen(siggen)
}
}
impl Default for StreamMgr {
fn default() -> Self {
Self::new()
}
}
impl StreamMgr {
pub fn new() -> StreamMgr {
if smgr_created.load(std::sync::atomic::Ordering::Relaxed) {
panic!("BUG: Only one stream manager is supposed to be a singleton");
}
smgr_created.store(true, std::sync::atomic::Ordering::Relaxed);
let mut smgr = StreamMgr {
devs: vec![],
input_stream: None,
output_stream: None,
siggen: None,
#[cfg(feature = "cpal-api")]
cpal_api: CpalApi::new(),
instreamqueues: Some(vec![]),
};
smgr.devs = smgr.scanDeviceInfo();
smgr
}
pub fn getStatus(&self, t: StreamType) -> StreamStatus {
match t {
StreamType::Input | StreamType::Duplex => {
if let Some(s) = &self.input_stream {
s.stream.status()
} else {
StreamStatus::NotRunning {}
}
}
StreamType::Output => {
if let Some(s) = &self.output_stream {
s.stream.status()
} else {
StreamStatus::NotRunning {}
}
}
}
}
pub fn setSiggen(&mut self, siggen: Siggen) {
if let Some(istream) = &self.input_stream {
if let StreamType::Duplex = istream.streamtype {
assert!(self.siggen.is_none());
istream.comm.send(StreamCommand::NewSiggen(siggen)).unwrap();
}
} else if let Some(os) = &self.output_stream {
assert!(self.siggen.is_none());
os.comm.send(StreamCommand::NewSiggen(siggen)).unwrap();
} else {
self.siggen = Some(siggen);
}
}
pub fn getDeviceInfo(&mut self) -> Vec<DeviceInfo> {
self.devs.clone()
}
fn scanDeviceInfo(&self) -> Vec<DeviceInfo> {
let mut devinfo = vec![];
#[cfg(feature = "cpal-api")]
{
let cpal_devs = self.cpal_api.getDeviceInfo();
if let Ok(devs) = cpal_devs {
devinfo.extend(devs);
}
}
devinfo
}
pub fn addInQueue(&mut self, tx: Sender<InStreamMsg>) {
if let Some(is) = &self.input_stream {
is.comm.send(StreamCommand::AddInQueue(tx)).unwrap()
} else {
self.instreamqueues.as_mut().unwrap().push(tx);
}
}
fn startInputStreamThread(
&mut self,
meta: Arc<StreamMetaData>,
rx: Receiver<RawStreamData>,
) -> (JoinHandle<InQueues>, Sender<StreamCommand>) {
let (commtx, commrx) = unbounded();
let mut iqueues = self
.instreamqueues
.take()
.expect("No input streams queues!");
let threadhandle = std::thread::spawn(move || {
let mut ctr: usize = 0;
'infy: loop {
if let Ok(comm_msg) = commrx.try_recv() {
match comm_msg {
StreamCommand::AddInQueue(queue) => {
match queue.send(InStreamMsg::StreamStarted(meta.clone())) {
Ok(()) => iqueues.push(queue),
Err(_) => {}
}
}
StreamCommand::StopThread => {
sendMsgToAllQueuesRemoveUnused(
&mut iqueues,
InStreamMsg::StreamStopped,
);
break 'infy;
}
StreamCommand::NewSiggen(_) => {
panic!("Error: signal generator send to input-only stream.");
}
}
}
if let Ok(raw) = rx.recv_timeout(time::Duration::from_millis(10)) {
let streamdata = StreamData::new(ctr, meta.clone(), raw);
let streamdata = Arc::new(streamdata);
let msg = InStreamMsg::StreamData(streamdata);
sendMsgToAllQueuesRemoveUnused(&mut iqueues, msg);
ctr += 1;
}
}
iqueues
});
(threadhandle, commtx)
}
fn find_device(&self, cfg: &DaqConfig) -> Result<&DeviceInfo> {
if let Some(matching_dev) = self
.devs
.iter()
.find(|&d| d.device_name == cfg.device_name && d.api == cfg.api)
{
return Ok(matching_dev);
}
bail!("Could not find device with name {}.", cfg.device_name);
}
fn startOuputStreamThread(
&mut self,
meta: Arc<StreamMetaData>,
tx: Sender<RawStreamData>,
) -> (JoinHandle<Siggen>, Sender<StreamCommand>) {
let (commtx, commrx) = unbounded();
let nchannels = meta.nchannels();
let mut siggen = self
.siggen
.take()
.unwrap_or_else(|| Siggen::newSilence(nchannels));
if siggen.nchannels() != nchannels {
siggen.setNChannels(nchannels);
}
siggen.reset(meta.samplerate);
let threadhandle = std::thread::spawn(move || {
let mut floatbuf: Vec<Flt> = Vec::with_capacity(nchannels * meta.framesPerBlock);
'infy: loop {
if let Ok(comm_msg) = commrx.try_recv() {
match comm_msg {
StreamCommand::AddInQueue(_) => {
panic!("Invalid message send to output thread: AddInQueue");
}
StreamCommand::StopThread => {
break 'infy;
}
StreamCommand::NewSiggen(new_siggen) => {
siggen = new_siggen;
siggen.reset(meta.samplerate);
if siggen.nchannels() != nchannels {
siggen.setNChannels(nchannels);
}
}
}
}
while tx.len() < 2 {
unsafe {
floatbuf.set_len(nchannels * meta.framesPerBlock);
}
siggen.genSignal(&mut floatbuf);
let msg = match meta.rawDatatype {
DataType::I8 => {
let v = Vec::<i8>::from_iter(floatbuf.iter().map(|f| f.to_sample()));
RawStreamData::Datai8(v)
}
DataType::I16 => {
let v = Vec::<i16>::from_iter(floatbuf.iter().map(|f| f.to_sample()));
RawStreamData::Datai16(v)
}
DataType::I32 => {
let v = Vec::<i32>::from_iter(floatbuf.iter().map(|f| f.to_sample()));
RawStreamData::Datai32(v)
}
DataType::F32 => {
let v = Vec::<f32>::from_iter(floatbuf.iter().map(|f| f.to_sample()));
RawStreamData::Dataf32(v)
}
DataType::F64 => {
let v = Vec::<f64>::from_iter(floatbuf.iter().map(|f| f.to_sample()));
RawStreamData::Dataf64(v)
}
};
if let Err(_e) = tx.send(msg) {
break 'infy;
}
}
}
siggen
});
(threadhandle, commtx)
}
pub fn startStream(&mut self, stype: StreamType, cfg: &DaqConfig) -> Result<()> {
match stype {
StreamType::Input | StreamType::Duplex => {
self.startInputOrDuplexStream(stype, cfg)?;
}
StreamType::Output => {
self.startOutputStream(cfg)?;
}
}
Ok(())
}
fn startOutputStream(&mut self, cfg: &DaqConfig) -> Result<()> {
let (tx, rx): (Sender<RawStreamData>, Receiver<RawStreamData>) = unbounded();
let stream = match cfg.api {
StreamApiDescr::Cpal => {
let devinfo = self.find_device(cfg)?;
cfg_if::cfg_if! {
if #[cfg(feature="cpal-api")] {
self.cpal_api.startOutputStream(devinfo, cfg, rx)?
} else {
bail!("API {} not available", cfg.api)
}
}
}
_ => bail!("API {} not implemented!", cfg.api),
};
let meta = stream.metadata();
let (threadhandle, commtx) = self.startOuputStreamThread(meta, tx);
self.output_stream = Some(StreamInfo {
streamtype: StreamType::Input,
stream,
threadhandle,
comm: commtx,
});
Ok(())
}
fn startInputOrDuplexStream(&mut self, stype: StreamType, cfg: &DaqConfig) -> Result<()> {
if self.input_stream.is_some() {
bail!("An input stream is already running. Please first stop existing input stream.")
}
if cfg.numberEnabledInChannels() == 0 {
bail!("At least one input channel should be enabled for an input stream")
}
if stype == StreamType::Duplex {
if cfg.numberEnabledOutChannels() == 0 {
bail!("At least one output channel should be enabled for a duplex stream")
}
if self.output_stream.is_some() {
bail!("An output stream is already running. Duplex mode stream cannot be started. Please first stop existing output stream.");
}
}
let (tx, rx): (Sender<RawStreamData>, Receiver<RawStreamData>) = unbounded();
let stream = match cfg.api {
StreamApiDescr::Cpal => {
if stype == StreamType::Duplex {
bail!("Duplex mode not supported for CPAL api");
}
let devinfo = self.find_device(cfg)?;
cfg_if::cfg_if! {
if #[cfg(feature="cpal-api")] {
self.cpal_api.startInputStream(stype, devinfo, cfg, tx)?
} else {
bail!("API {} not available", cfg.api)
}
}
}
_ => bail!("API {} not implemented!", cfg.api),
};
let iqueues = self.instreamqueues.as_mut().unwrap();
let meta = stream.metadata();
sendMsgToAllQueuesRemoveUnused(iqueues, InStreamMsg::StreamStarted(meta.clone()));
let (threadhandle, commtx) = self.startInputStreamThread(meta, rx);
self.input_stream = Some(StreamInfo {
streamtype: stype,
stream,
threadhandle,
comm: commtx,
});
Ok(())
}
pub fn startDefaultInputStream(&mut self) -> Result<()> {
if self.input_stream.is_some() {
bail!("Input stream is already running. Please first stop existing input stream.")
}
let (tx, rx): (Sender<RawStreamData>, Receiver<RawStreamData>) = unbounded();
cfg_if::cfg_if! {
if #[cfg(feature="cpal-api")] {
let stream = self.cpal_api.startDefaultInputStream(tx)?;
let iqueues = self.instreamqueues.as_mut().unwrap();
let meta = stream.metadata();
sendMsgToAllQueuesRemoveUnused(iqueues, InStreamMsg::StreamStarted(meta.clone()));
let (threadhandle, commtx) = self.startInputStreamThread(meta, rx);
self.input_stream = Some(StreamInfo {
streamtype: StreamType::Input,
stream,
threadhandle,
comm: commtx,
});
Ok(())
}
else {
bail!("Unable to start default input stream: no CPAL api available")
}
}
}
pub fn startDefaultOutputStream(&mut self) -> Result<()> {
if let Some(istream) = &self.input_stream {
if istream.streamtype == StreamType::Duplex {
bail!("Duplex stream is already running");
}
}
if self.output_stream.is_some() {
bail!("An output stream is already running. Duplex mode stream cannot be started. Please first stop existing output stream.");
}
cfg_if::cfg_if! {
if #[cfg(feature="cpal-api")] {
let (tx, rx)= unbounded();
let stream = self.cpal_api.startDefaultOutputStream(rx)?;
let meta = stream.metadata();
let (threadhandle, commtx) = self.startOuputStreamThread(meta, tx);
self.output_stream = Some(StreamInfo {
streamtype: StreamType::Input,
stream,
threadhandle,
comm: commtx,
});
Ok(())
} else {
bail!("Unable to start default input stream: no CPAL api available")
}
} }
pub fn stopInputStream(&mut self) -> Result<()> {
if let Some(StreamInfo {
streamtype: _, stream: _,
threadhandle,
comm,
}) = self.input_stream.take()
{
comm.send(StreamCommand::StopThread).unwrap();
self.instreamqueues = Some(threadhandle.join().expect("Stream thread panicked!"));
} else {
bail!("Stream is not running.")
}
Ok(())
}
pub fn stopOutputStream(&mut self) -> Result<()> {
if let Some(StreamInfo {
streamtype: _, stream: _,
threadhandle,
comm,
}) = self.output_stream.take()
{
if comm.send(StreamCommand::StopThread).is_err() {
assert!(threadhandle.is_finished());
}
self.siggen = Some(threadhandle.join().expect("Output thread panicked!"));
} else {
bail!("Stream is not running.");
}
Ok(())
}
pub fn stopStream(&mut self, st: StreamType) -> Result<()> {
match st {
StreamType::Input | StreamType::Duplex => self.stopInputStream(),
StreamType::Output => self.stopOutputStream(),
}
}
} impl Drop for StreamMgr {
fn drop(&mut self) {
if self.input_stream.is_some() {
self.stopStream(StreamType::Input).unwrap();
}
if self.output_stream.is_some() {
self.stopStream(StreamType::Output).unwrap();
}
smgr_created.store(false, std::sync::atomic::Ordering::Relaxed);
}
}
fn sendMsgToAllQueuesRemoveUnused(iqueues: &mut InQueues, msg: InStreamMsg) {
iqueues.retain(|q| match q.try_send(msg.clone()) {
Ok(_) => true,
Err(_e) => false,
});
}