use seify::Device;
use seify::DeviceTrait;
use seify::Direction::Tx;
use seify::TxStreamer;
use std::time::Duration;
use crate::blocks::seify::Config;
use crate::num_complex::Complex32;
use crate::prelude::*;
#[derive(Block)]
#[blocking]
#[message_inputs(freq, gain, sample_rate, cmd, config)]
#[message_outputs(terminate_out)]
#[type_name(SeifySink)]
pub struct Sink<D, IN = DefaultCpuReader<Complex32>>
where
D: DeviceTrait + Clone,
IN: CpuBufferReader<Item = Complex32>,
{
#[input]
inputs: Vec<IN>,
channels: Vec<usize>,
dev: Device<D>,
streamer: Option<D::TxStreamer>,
start_time: Option<i64>,
max_input_samples: usize,
}
impl<D, IN> Sink<D, IN>
where
D: DeviceTrait + Clone,
IN: CpuBufferReader<Item = Complex32>,
{
pub(super) fn new(dev: Device<D>, channels: Vec<usize>, start_time: Option<i64>) -> Self {
assert!(!channels.is_empty());
let mut inputs = Vec::new();
for _ in 0..channels.len() {
inputs.push(IN::default());
}
Self {
inputs,
channels,
dev,
start_time,
streamer: None,
max_input_samples: 0,
}
}
async fn cmd(
&mut self,
_io: &mut WorkIo,
_mio: &mut MessageOutputs,
_meta: &mut BlockMeta,
p: Pmt,
) -> Result<Pmt> {
let c: Config = p.try_into()?;
c.apply(&self.dev, &self.channels, Tx)?;
Ok(Pmt::Ok)
}
async fn freq(
&mut self,
_io: &mut WorkIo,
_mio: &mut MessageOutputs,
_meta: &mut BlockMeta,
p: Pmt,
) -> Result<Pmt> {
for c in &self.channels {
match &p {
Pmt::F32(v) => self.dev.set_frequency(Tx, *c, *v as f64)?,
Pmt::F64(v) => self.dev.set_frequency(Tx, *c, *v)?,
Pmt::U32(v) => self.dev.set_frequency(Tx, *c, *v as f64)?,
Pmt::U64(v) => self.dev.set_frequency(Tx, *c, *v as f64)?,
_ => return Ok(Pmt::InvalidValue),
};
}
Ok(Pmt::Ok)
}
async fn gain(
&mut self,
_io: &mut WorkIo,
_mio: &mut MessageOutputs,
_meta: &mut BlockMeta,
p: Pmt,
) -> Result<Pmt> {
for c in &self.channels {
match &p {
Pmt::F32(v) => self.dev.set_gain(Tx, *c, *v as f64)?,
Pmt::F64(v) => self.dev.set_gain(Tx, *c, *v)?,
Pmt::U32(v) => self.dev.set_gain(Tx, *c, *v as f64)?,
Pmt::U64(v) => self.dev.set_gain(Tx, *c, *v as f64)?,
_ => return Ok(Pmt::InvalidValue),
};
}
Ok(Pmt::Ok)
}
async fn sample_rate(
&mut self,
_io: &mut WorkIo,
_mio: &mut MessageOutputs,
_meta: &mut BlockMeta,
p: Pmt,
) -> Result<Pmt> {
for c in &self.channels {
match &p {
Pmt::F32(v) => self.dev.set_sample_rate(Tx, *c, *v as f64)?,
Pmt::F64(v) => self.dev.set_sample_rate(Tx, *c, *v)?,
Pmt::U32(v) => self.dev.set_sample_rate(Tx, *c, *v as f64)?,
Pmt::U64(v) => self.dev.set_sample_rate(Tx, *c, *v as f64)?,
_ => return Ok(Pmt::InvalidValue),
};
}
Ok(Pmt::Ok)
}
async fn config(
&mut self,
_io: &mut WorkIo,
_mio: &mut MessageOutputs,
_meta: &mut BlockMeta,
channel: Pmt,
) -> Result<Pmt> {
let id = match channel {
Pmt::Null | Pmt::Ok => 0,
Pmt::U32(id) => id as usize,
Pmt::U64(id) => id as usize,
Pmt::Usize(id) => id,
_ => return Ok(Pmt::InvalidValue),
};
if id >= self.channels.len() {
return Ok(Pmt::InvalidValue);
}
Ok(Config::from(&self.dev, Tx, id)?.to_serializable_pmt())
}
}
#[doc(hidden)]
impl<D, IN> Kernel for Sink<D, IN>
where
D: DeviceTrait + Clone,
IN: CpuBufferReader<Item = Complex32>,
{
async fn work(
&mut self,
io: &mut WorkIo,
mio: &mut MessageOutputs,
_meta: &mut BlockMeta,
) -> Result<()> {
let tags = self.inputs[0].slice_with_tags().1.clone();
let bufs: Vec<&[Complex32]> = self.inputs.iter_mut().map(|b| b.slice()).collect();
let streamer = self.streamer.as_mut().unwrap();
let nitems_per_input_stream: Vec<usize> = bufs.iter().map(|b| b.len()).collect();
let n = nitems_per_input_stream.iter().copied().min().unwrap_or(0);
let consumed = if n > 0 {
let t = tags.iter().find_map(|x| match x {
ItemTag {
index,
tag: Tag::NamedUsize(n, len),
} => {
if *index == 0 && n == "burst_start" {
Some(*len)
} else {
None
}
}
_ => None,
});
let consumed = if let Some(len) = t {
if n >= len {
let bufs: Vec<&[Complex32]> = bufs.iter().map(|b| &b[0..len]).collect();
let ret = streamer.write(&bufs, None, true, 2_000_000)?;
debug_assert_eq!(ret, len);
ret
} else if len > self.max_input_samples {
warn!(
"input buffers of seify sink too small to fit complete frame. sending in non-burst mode"
);
let bufs: Vec<&[Complex32]> = bufs.iter().map(|b| &b[0..n]).collect();
let ret = streamer.write(&bufs, None, true, 2_000_000)?;
debug_assert_eq!(ret, n);
ret
} else {
0
}
} else {
let ret = streamer.write(&bufs, None, false, 2_000_000)?;
if ret != n {
io.call_again = true;
}
ret
};
self.inputs.iter_mut().for_each(|i| i.consume(consumed));
consumed
} else {
0
};
io.finished = self
.inputs
.iter_mut()
.zip(nitems_per_input_stream)
.any(|(input, input_length)| input.finished() && input_length - consumed == 0);
if io.finished {
let smallest_sample_rate: f32 =
self.channels
.iter()
.map(|c| self.dev.sample_rate(Tx, *c).unwrap())
.fold(f64::INFINITY, |a, b| a.min(b)) as f32;
let termination_delay = consumed as f32 / smallest_sample_rate;
async_io::Timer::after(Duration::from_secs_f32(termination_delay + 0.5)).await;
mio.post("terminate_out", Pmt::Ok).await?;
}
Ok(())
}
async fn init(&mut self, _mio: &mut MessageOutputs, _meta: &mut BlockMeta) -> Result<()> {
self.max_input_samples = self
.inputs
.iter_mut()
.map(|i| i.slice().len())
.min()
.unwrap_or(0);
self.streamer = Some(self.dev.tx_streamer(&self.channels)?);
self.streamer
.as_mut()
.ok_or(Error::RuntimeError("Seify: no streamer".to_string()))?
.activate_at(self.start_time)?;
Ok(())
}
async fn deinit(&mut self, _mio: &mut MessageOutputs, _meta: &mut BlockMeta) -> Result<()> {
self.streamer
.as_mut()
.ok_or(Error::RuntimeError("Seify: no streamer".to_string()))?
.deactivate()?;
Ok(())
}
}