use std::borrow::Cow;
use crate::block::{Block, BlockRet};
use crate::stream::{NCReadStream, NCWriteStream, ReadStream, Tag, WriteStream};
use crate::{Complex, Float, Result, Sample};
#[derive(rustradio_macros::Block)]
#[rustradio(
crate,
custom_name,
sync_tag,
new,
bound = "In: Sample, F: Fn(In, &[Tag]) + Send"
)]
pub struct Inspect<In, F> {
#[rustradio(into)]
name: String,
f: F,
#[rustradio(in)]
src: ReadStream<In>,
#[rustradio(out)]
dst: WriteStream<In>,
}
impl<In, F> Inspect<In, F>
where
In: Sample,
F: Fn(In, &[Tag]) + Send,
{
fn process_sync_tags<'a>(&mut self, s: In, tags: &'a [Tag]) -> (In, Cow<'a, [Tag]>) {
(self.f)(s, tags);
(s, Cow::Borrowed(tags))
}
}
impl<In, F> Inspect<In, F> {
pub fn custom_name(&self) -> &str {
&self.name
}
}
#[derive(rustradio_macros::Block)]
#[rustradio(crate, new, bound = "T: Default+Clone")]
pub struct Parse<T> {
#[rustradio(in)]
src: ReadStream<u8>,
#[rustradio(out)]
dst: WriteStream<T>,
}
impl<T: Sample<Type = T>> Block for Parse<T> {
fn work(&mut self) -> Result<BlockRet<'_>> {
loop {
let (i, _) = self.src.read_buf()?;
if i.len() < T::size() {
return Ok(BlockRet::WaitForStream(&self.src, 1));
}
let mut o = self.dst.write_buf()?;
if o.is_empty() {
return Ok(BlockRet::WaitForStream(&self.dst, 1));
}
let items = o.len().min(i.len() / T::size());
let os = o.slice();
for (n, s) in i
.slice()
.chunks_exact(T::size())
.take(items)
.map(|chunk| T::parse(chunk))
.enumerate()
{
os[n] = s?;
}
o.produce(items, &[]);
i.consume(items * T::size());
}
}
}
#[derive(rustradio_macros::Block)]
#[rustradio(
crate,
custom_name,
sync_tag,
new,
bound = "In: Sample, Out: Sample",
bound = "F: for<'a> Fn(In, &'a [Tag]) -> (Out, Cow<'a, [Tag]>) + Send"
)]
pub struct Map<In, Out, F> {
#[rustradio(into)]
name: String,
map: F,
#[rustradio(in)]
src: ReadStream<In>,
#[rustradio(out)]
dst: WriteStream<Out>,
}
#[allow(clippy::type_complexity)]
impl Map<(), (), ()> {
#[allow(clippy::type_complexity)]
pub fn keep_tags<In, Out, Name, F2>(
src: ReadStream<In>,
name: Name,
f: F2,
) -> (
Map<In, Out, impl for<'a> Fn(In, &'a [Tag]) -> (Out, Cow<'a, [Tag]>)>,
ReadStream<Out>,
)
where
In: Sample,
Out: Sample,
Name: Into<String>,
F2: Fn(In) -> Out + Send,
{
Map::new(src, name, move |s, tags| (f(s), Cow::Borrowed(tags)))
}
}
impl<In, Out, F> Map<In, Out, F>
where
In: Sample,
Out: Sample,
F: for<'a> Fn(In, &'a [Tag]) -> (Out, Cow<'a, [Tag]>) + Send,
{
fn process_sync_tags<'a>(&mut self, s: In, tags: &'a [Tag]) -> (Out, Cow<'a, [Tag]>) {
(self.map)(s, tags)
}
}
impl<In, Out, F> Map<In, Out, F> {
pub fn custom_name(&self) -> &str {
&self.name
}
}
#[derive(rustradio_macros::Block)]
#[rustradio(
crate,
custom_name,
new,
bound = "In: Send + Sync",
bound = "Out: Send + Sync",
bound = "F: Fn(In, Vec<Tag>) -> Vec<(Out, Vec<Tag>)> + Send"
)]
pub struct NCMap<In, Out, F> {
#[rustradio(into)]
name: String,
map: F,
#[rustradio(in)]
src: NCReadStream<In>,
#[rustradio(out)]
dst: NCWriteStream<Out>,
}
impl<In, Out, F> Block for NCMap<In, Out, F>
where
In: Send + Sync,
Out: Send + Sync,
F: Fn(In, Vec<Tag>) -> Vec<(Out, Vec<Tag>)> + Send,
{
fn work(&mut self) -> Result<BlockRet<'_>> {
loop {
let Some((x, tags)) = self.src.pop() else {
return Ok(BlockRet::WaitForStream(&self.src, 1));
};
for (packet, new_tags) in (self.map)(x, tags) {
self.dst.push(packet, new_tags);
}
}
}
}
impl<In, Out, F> NCMap<In, Out, F> {
pub fn custom_name(&self) -> &str {
&self.name
}
}
#[derive(rustradio_macros::Block)]
#[rustradio(crate, new, sync)]
pub struct FloatToComplex {
#[rustradio(in)]
re: ReadStream<Float>,
#[rustradio(in)]
im: ReadStream<Float>,
#[rustradio(out)]
dst: WriteStream<Complex>,
}
impl FloatToComplex {
fn process_sync(&self, re: Float, im: Float) -> Complex {
Complex::new(re, im)
}
}
#[derive(rustradio_macros::Block)]
#[rustradio(crate, new, sync)]
pub struct ComplexToFloat {
#[rustradio(in)]
src: ReadStream<Complex>,
#[rustradio(out)]
re: WriteStream<Float>,
#[rustradio(out)]
im: WriteStream<Float>,
}
impl ComplexToFloat {
fn process_sync(&self, c: Complex) -> (Float, Float) {
(c.re, c.im)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::blocks::VectorSinkNoCopy;
use crate::stream::{Tag, TagValue, new_nocopy_stream};
#[test]
fn ncmap_identity() -> Result<()> {
let (tx, rx) = new_nocopy_stream();
let (mut m, out) = NCMap::new(rx, "nctest", |packet, tags| vec![(packet, tags)]);
let mut sink = VectorSinkNoCopy::new(out, 10);
let res = sink.storage();
tx.push(vec![0u8, 1, 2, 3], &[]);
tx.push(
vec![9u8, 33, 22, 11],
&[Tag::new(0, "foo", TagValue::U64(42))],
);
m.work()?;
sink.work()?;
let r = res.lock().unwrap();
assert_eq!(
&**r,
vec![
(vec![0u8, 1, 2, 3], vec![]),
(
vec![9u8, 33, 22, 11],
vec![Tag::new(0, "foo", TagValue::U64(42))],
),
]
);
Ok(())
}
#[test]
fn ncmap_drop() -> Result<()> {
let (tx, rx) = new_nocopy_stream();
let (mut m, out) = NCMap::new(rx, "nctest", |_packet, _tags| vec![]);
let mut sink: VectorSinkNoCopy<Vec<u8>> = VectorSinkNoCopy::new(out, 10);
let res = sink.storage();
tx.push(vec![0u8, 1, 2, 3], &[]);
m.work()?;
sink.work()?;
let r = res.lock().unwrap();
assert_eq!(&**r, vec![]);
Ok(())
}
#[test]
fn ncmap_multipacket() -> Result<()> {
let (tx, rx) = new_nocopy_stream();
let (mut m, out) = NCMap::new(rx, "nctest", |packet: Vec<u8>, tags| {
vec![
(packet.iter().map(|s| *s * 2).collect(), tags.clone()),
(packet.iter().map(|s| *s * 20).collect(), tags.clone()),
]
});
let mut sink = VectorSinkNoCopy::new(out, 10);
let res = sink.storage();
tx.push(vec![0u8, 1, 2, 3], &[]);
m.work()?;
sink.work()?;
let r = res.lock().unwrap();
assert_eq!(
&**r,
vec![
(vec![0u8, 2, 4, 6], vec![]),
(vec![0u8, 20, 40, 60], vec![]),
]
);
Ok(())
}
#[test]
fn ncmap_double() -> Result<()> {
let (tx, rx) = new_nocopy_stream();
let (mut m, out) = NCMap::new(rx, "nctest", |packet: Vec<u8>, tags| {
vec![(packet.iter().map(|s| *s * 2).collect(), tags)]
});
let mut sink = VectorSinkNoCopy::new(out, 10);
let res = sink.storage();
tx.push(vec![0u8, 1, 2, 3], &[]);
m.work()?;
sink.work()?;
let r = res.lock().unwrap();
assert_eq!(&**r, vec![(vec![0u8, 2, 4, 6], vec![])]);
Ok(())
}
#[test]
fn ncmap_double_inplace() -> Result<()> {
let (tx, rx) = new_nocopy_stream();
let (mut m, out) = NCMap::new(rx, "nctest", |mut packet: Vec<u8>, tags| {
for v in &mut packet {
*v = *v + *v;
}
vec![(packet, tags)]
});
let mut sink = VectorSinkNoCopy::new(out, 10);
let res = sink.storage();
tx.push(vec![0u8, 1, 2, 3], &[]);
m.work()?;
sink.work()?;
let r = res.lock().unwrap();
assert_eq!(&**r, vec![(vec![0u8, 2, 4, 6], vec![])]);
Ok(())
}
#[test]
fn ncmap_convert() -> Result<()> {
let (tx, rx) = new_nocopy_stream();
let (mut m, out) = NCMap::new(rx, "nctest", |packet: Vec<u8>, tags| {
vec![(packet.iter().map(|s| *s as Float + 0.1).collect(), tags)]
});
let mut sink = VectorSinkNoCopy::new(out, 10);
let res = sink.storage();
tx.push(vec![0u8, 1, 2, 3], &[]);
m.work()?;
sink.work()?;
let r = res.lock().unwrap();
assert_eq!(&**r, vec![(vec![0.1 as Float, 1.1, 2.1, 3.1], vec![])]);
Ok(())
}
#[test]
fn ncmap_append() -> Result<()> {
let (tx, rx) = new_nocopy_stream();
let (mut m, out) = NCMap::new(rx, "nctest", |packet: Vec<u8>, tags| {
let mut p2 = packet.clone();
p2.extend(packet);
vec![(p2, tags)]
});
let mut sink = VectorSinkNoCopy::new(out, 10);
let res = sink.storage();
tx.push(vec![0u8, 1, 2, 3], &[]);
m.work()?;
sink.work()?;
let r = res.lock().unwrap();
assert_eq!(&**r, vec![(vec![0u8, 1, 2, 3, 0, 1, 2, 3], vec![])]);
Ok(())
}
}