use futures::channel::mpsc::Receiver;
use futures::channel::mpsc::Sender;
use futures::channel::mpsc::channel;
use futuresdr_types::BlockId;
use std::any::Any;
use std::fmt::Debug;
use std::ops::Deref;
use std::ops::DerefMut;
use crate::runtime::BlockMessage;
use crate::runtime::Error;
use crate::runtime::ItemTag;
use crate::runtime::Kernel;
use crate::runtime::KernelInterface;
use crate::runtime::Pmt;
use crate::runtime::PortId;
use crate::runtime::WorkIo;
use crate::runtime::WrappedKernel;
use crate::runtime::buffer::BufferReader;
use crate::runtime::buffer::BufferWriter;
use crate::runtime::buffer::CpuBufferReader;
use crate::runtime::buffer::CpuBufferWriter;
use crate::runtime::buffer::CpuSample;
use crate::runtime::buffer::Tags;
use crate::runtime::config::config;
pub struct Mocker<K: Kernel> {
pub block: WrappedKernel<K>,
message_sinks: Vec<Receiver<BlockMessage>>,
messages: Vec<Vec<Pmt>>,
}
impl<K: KernelInterface + Kernel + 'static> Deref for Mocker<K> {
type Target = WrappedKernel<K>;
fn deref(&self) -> &Self::Target {
&self.block
}
}
impl<K: KernelInterface + Kernel + 'static> DerefMut for Mocker<K> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.block
}
}
impl<K: KernelInterface + Kernel + 'static> Mocker<K> {
pub fn new(kernel: K) -> Self {
let mut block = WrappedKernel::new(kernel, BlockId(0));
let mut messages = Vec::new();
let mut message_sinks = Vec::new();
let msg_len = config().queue_size;
for n in K::message_outputs() {
messages.push(Vec::new());
let (tx, rx) = channel(msg_len);
message_sinks.push(rx);
block
.mio
.connect(&PortId::new(*n), tx, &PortId::new("input"))
.unwrap();
}
Mocker {
block,
message_sinks,
messages,
}
}
pub fn post(&mut self, id: impl Into<PortId>, p: Pmt) -> Result<Pmt, Error> {
let id = id.into();
let mut io = WorkIo {
call_again: false,
finished: false,
block_on: None,
};
let WrappedKernel {
meta, mio, kernel, ..
} = &mut self.block;
async_io::block_on(kernel.call_handler(&mut io, mio, meta, id, p))
.map_err(|e| Error::HandlerError(e.to_string()))
}
pub fn run(&mut self) {
crate::async_io::block_on(self.run_async());
}
pub fn init(&mut self) {
crate::async_io::block_on(async {
self.block
.kernel
.init(&mut self.block.mio, &mut self.block.meta)
.await
.unwrap();
});
}
pub fn deinit(&mut self) {
crate::async_io::block_on(async {
self.block
.kernel
.deinit(&mut self.block.mio, &mut self.block.meta)
.await
.unwrap();
});
}
pub fn messages(&self) -> Vec<Vec<Pmt>> {
self.messages.clone()
}
pub fn take_messages(&mut self) -> Vec<Vec<Pmt>> {
std::mem::take(&mut self.messages)
}
pub async fn run_async(&mut self) {
let mut io = WorkIo {
call_again: false,
finished: false,
block_on: None,
};
loop {
self.block
.kernel
.work(&mut io, &mut self.block.mio, &mut self.block.meta)
.await
.unwrap();
for (n, r) in self.message_sinks.iter_mut().enumerate() {
while let Ok(Some(m)) = r.try_next() {
match m {
BlockMessage::Call { data, .. } => {
self.messages[n].push(data);
}
_ => panic!("Mocked Block produced unexpected BlockMessage {m:?}"),
}
}
}
if !io.call_again {
break;
} else {
io.call_again = false;
}
}
}
}
#[derive(Debug)]
pub struct Reader<T: Debug + Send + 'static> {
data: Vec<T>,
tags: Vec<ItemTag>,
block_id: BlockId,
port_id: PortId,
}
impl<T: Debug + Send + 'static> Reader<T> {
pub fn set(&mut self, data: Vec<T>)
where
T: Debug + Send + 'static,
{
self.set_with_tags(data, Vec::new());
}
pub fn set_with_tags(&mut self, data: Vec<T>, tags: Vec<ItemTag>)
where
T: Debug + Send + 'static,
{
self.data = data;
self.tags = tags;
}
}
impl<T: Debug + Send + 'static> Default for Reader<T> {
fn default() -> Self {
Self {
data: vec![],
tags: vec![],
block_id: BlockId(0),
port_id: PortId::new("input"),
}
}
}
#[async_trait]
impl<T: Debug + Send + 'static> BufferReader for Reader<T> {
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
fn init(&mut self, block_id: BlockId, port_id: PortId, _inbox: Sender<BlockMessage>) {
self.block_id = block_id;
self.port_id = port_id;
}
fn validate(&self) -> Result<(), Error> {
Ok(())
}
async fn notify_finished(&mut self) {}
fn finish(&mut self) {}
fn finished(&self) -> bool {
true
}
fn block_id(&self) -> BlockId {
self.block_id
}
fn port_id(&self) -> PortId {
self.port_id.clone()
}
}
impl<T> CpuBufferReader for Reader<T>
where
T: CpuSample,
{
type Item = T;
fn slice(&mut self) -> &[Self::Item] {
self.data.as_slice()
}
fn slice_with_tags(&mut self) -> (&[Self::Item], &Vec<ItemTag>) {
(self.data.as_slice(), &self.tags)
}
fn consume(&mut self, n: usize) {
self.data = self.data.split_off(n);
self.tags.retain(|x| x.index >= n);
for t in self.tags.iter_mut() {
t.index -= n;
}
}
fn set_min_items(&mut self, _n: usize) {
warn!("set_min_items has no effect in with mocker");
}
fn set_min_buffer_size_in_items(&mut self, _n: usize) {
warn!("set_min_buffer_size_in_items has no effect in a mocker");
}
fn max_items(&self) -> usize {
self.data.len()
}
}
#[derive(Debug)]
pub struct Writer<T: Clone + Debug + Send + 'static> {
data: Vec<T>,
tags: Vec<ItemTag>,
block_id: BlockId,
port_id: PortId,
}
impl<T: Clone + Debug + Send + 'static> Default for Writer<T> {
fn default() -> Self {
Self {
data: vec![],
tags: vec![],
block_id: BlockId(0),
port_id: PortId::new("output"),
}
}
}
impl<T: Clone + Debug + Send + 'static> Writer<T> {
pub fn reserve(&mut self, n: usize) {
self.data = Vec::with_capacity(n);
}
pub fn get(&self) -> (Vec<T>, Vec<ItemTag>) {
(self.data.clone(), self.tags.clone())
}
pub fn take(&mut self) -> (Vec<T>, Vec<ItemTag>) {
(
std::mem::take(&mut self.data),
std::mem::take(&mut self.tags),
)
}
}
impl<T: Clone + Debug + Send + 'static> BufferWriter for Writer<T> {
type Reader = Reader<T>;
fn init(&mut self, block_id: BlockId, port_id: PortId, _inbox: Sender<BlockMessage>) {
self.block_id = block_id;
self.port_id = port_id;
}
fn validate(&self) -> Result<(), Error> {
Ok(())
}
fn connect(&mut self, _dest: &mut Self::Reader) {}
async fn notify_finished(&mut self) {}
fn block_id(&self) -> BlockId {
self.block_id
}
fn port_id(&self) -> PortId {
self.port_id.clone()
}
}
impl<T> CpuBufferWriter for Writer<T>
where
T: CpuSample,
{
type Item = T;
fn slice(&mut self) -> &mut [Self::Item] {
unsafe {
std::slice::from_raw_parts_mut(
self.data.as_mut_ptr().add(self.data.len()),
self.data.capacity() - self.data.len(),
)
}
}
fn slice_with_tags(&mut self) -> (&mut [Self::Item], Tags<'_>) {
let s = unsafe {
std::slice::from_raw_parts_mut(
self.data.as_mut_ptr().add(self.data.len()),
self.data.capacity() - self.data.len(),
)
};
(s, Tags::new(&mut self.tags, self.data.len()))
}
fn produce(&mut self, n: usize) {
let curr_len = self.data.len();
unsafe {
self.data.set_len(curr_len + n);
}
}
fn set_min_items(&mut self, _n: usize) {
warn!("set_min_items has no effect in with mocker");
}
fn set_min_buffer_size_in_items(&mut self, _n: usize) {
warn!("set_min_buffer_size_in_items has no effect in a mocker");
}
fn max_items(&self) -> usize {
self.data.len()
}
}