#[cfg(feature = "burn")]
pub mod burn;
pub mod circuit;
pub mod local;
#[doc(hidden)]
pub mod queued;
#[cfg(not(target_arch = "wasm32"))]
pub mod circular;
pub mod slab;
#[cfg(feature = "vulkan")]
pub mod vulkan;
#[cfg(feature = "wgpu")]
pub mod wgpu;
#[cfg(all(feature = "zynq", target_os = "linux"))]
pub mod zynq;
use std::any::Any;
use std::future::Future;
use crate::runtime::dev::BlockInbox;
use crate::runtime::dev::BlockNotifier;
use crate::runtime::dev::ItemTag;
use crate::runtime::dev::Tag;
use futuresdr::runtime::BlockId;
use futuresdr::runtime::Error;
use futuresdr::runtime::PortId;
#[derive(Debug, Clone, Copy, Default)]
pub struct PortConfig {
min_items: Option<usize>,
min_buffer_size_in_items: Option<usize>,
}
impl PortConfig {
pub const fn new() -> Self {
Self {
min_items: None,
min_buffer_size_in_items: None,
}
}
pub const fn with_min_items(min_items: usize) -> Self {
Self {
min_items: Some(min_items),
min_buffer_size_in_items: None,
}
}
pub const fn min_items(&self) -> Option<usize> {
self.min_items
}
pub fn set_min_items(&mut self, min_items: usize) {
self.min_items = Some(min_items);
}
pub fn set_min_items_max(&mut self, min_items: usize) {
self.min_items = Some(self.min_items.unwrap_or(0).max(min_items));
}
pub const fn min_buffer_size_in_items(&self) -> Option<usize> {
self.min_buffer_size_in_items
}
pub fn set_min_buffer_size_in_items(&mut self, min_items: usize) {
self.min_buffer_size_in_items = Some(min_items);
}
pub fn set_min_buffer_size_in_items_max(&mut self, min_items: usize) {
self.min_buffer_size_in_items =
Some(self.min_buffer_size_in_items.unwrap_or(0).max(min_items));
}
}
#[derive(Debug, Clone)]
pub enum PortBinding {
Unbound,
Bound {
block_id: BlockId,
port_id: PortId,
inbox: BlockInbox,
},
}
#[derive(Debug, Clone)]
pub struct PortCore {
binding: PortBinding,
config: PortConfig,
}
impl PortCore {
pub const fn new_disconnected() -> Self {
Self::with_config(PortConfig::new())
}
pub const fn with_config(config: PortConfig) -> Self {
Self {
binding: PortBinding::Unbound,
config,
}
}
pub fn init(&mut self, block_id: BlockId, port_id: PortId, inbox: BlockInbox) {
self.binding = PortBinding::Bound {
block_id,
port_id,
inbox,
};
}
pub fn is_bound(&self) -> bool {
matches!(self.binding, PortBinding::Bound { .. })
}
pub fn binding(&self) -> &PortBinding {
&self.binding
}
pub fn block_id(&self) -> BlockId {
match &self.binding {
PortBinding::Bound { block_id, .. } => *block_id,
PortBinding::Unbound => panic!("port is not bound to a flowgraph"),
}
}
pub fn block_id_if_bound(&self) -> Option<BlockId> {
match &self.binding {
PortBinding::Bound { block_id, .. } => Some(*block_id),
PortBinding::Unbound => None,
}
}
pub fn port_id(&self) -> PortId {
match &self.binding {
PortBinding::Bound { port_id, .. } => port_id.clone(),
PortBinding::Unbound => panic!("port is not bound to a flowgraph"),
}
}
pub fn port_id_if_bound(&self) -> Option<&PortId> {
match &self.binding {
PortBinding::Bound { port_id, .. } => Some(port_id),
PortBinding::Unbound => None,
}
}
pub fn inbox(&self) -> BlockInbox {
match &self.binding {
PortBinding::Bound { inbox, .. } => inbox.clone(),
PortBinding::Unbound => panic!("port is not bound to a flowgraph"),
}
}
pub fn notifier(&self) -> BlockNotifier {
match &self.binding {
PortBinding::Bound { inbox, .. } => inbox.notifier(),
PortBinding::Unbound => panic!("port is not bound to a flowgraph"),
}
}
pub fn min_items(&self) -> Option<usize> {
self.config.min_items()
}
pub fn set_min_items(&mut self, min_items: usize) {
self.config.set_min_items(min_items);
}
pub fn set_min_items_max(&mut self, min_items: usize) {
self.config.set_min_items_max(min_items);
}
pub fn min_buffer_size_in_items(&self) -> Option<usize> {
self.config.min_buffer_size_in_items()
}
pub fn set_min_buffer_size_in_items(&mut self, min_items: usize) {
self.config.set_min_buffer_size_in_items(min_items);
}
pub fn set_min_buffer_size_in_items_max(&mut self, min_items: usize) {
self.config.set_min_buffer_size_in_items_max(min_items);
}
pub fn not_connected_error(&self) -> Error {
match &self.binding {
PortBinding::Bound {
block_id, port_id, ..
} => Error::ValidationError(format!("{block_id:?}:{port_id:?} not connected")),
PortBinding::Unbound => {
Error::ValidationError("stream port is not bound to a flowgraph".to_string())
}
}
}
}
#[derive(Debug, Clone)]
pub struct PortEndpoint {
inbox: BlockInbox,
port_id: PortId,
}
impl PortEndpoint {
pub fn new(inbox: BlockInbox, port_id: PortId) -> Self {
Self { inbox, port_id }
}
pub fn inbox(&self) -> BlockInbox {
self.inbox.clone()
}
pub fn port_id(&self) -> PortId {
self.port_id.clone()
}
}
#[derive(Debug, Clone)]
pub(crate) struct CircuitReturn<Q> {
notifier: BlockNotifier,
queue: Q,
}
impl<Q> CircuitReturn<Q> {
pub(crate) fn new(notifier: BlockNotifier, queue: Q) -> Self {
Self { notifier, queue }
}
pub(crate) fn notify(&self) {
self.notifier.notify();
}
pub(crate) fn queue(&self) -> &Q {
&self.queue
}
}
#[derive(Debug)]
pub enum ConnectionState<T> {
Disconnected,
Connected(T),
}
impl<T> ConnectionState<T> {
pub const fn disconnected() -> Self {
Self::Disconnected
}
pub fn is_connected(&self) -> bool {
matches!(self, Self::Connected(_))
}
pub fn as_ref(&self) -> Option<&T> {
match self {
Self::Disconnected => None,
Self::Connected(value) => Some(value),
}
}
pub fn as_mut(&mut self) -> Option<&mut T> {
match self {
Self::Disconnected => None,
Self::Connected(value) => Some(value),
}
}
pub fn connected(&self) -> &T {
self.as_ref()
.expect("buffer backend is disconnected after validation")
}
pub fn connected_mut(&mut self) -> &mut T {
self.as_mut()
.expect("buffer backend is disconnected after validation")
}
pub fn set_connected(&mut self, value: T) {
*self = Self::Connected(value);
}
pub fn take_connected(&mut self) -> Option<T> {
match std::mem::replace(self, Self::Disconnected) {
Self::Disconnected => None,
Self::Connected(value) => Some(value),
}
}
}
pub trait SendBufferReader: BufferReader<notify_finished(..): Send> + Send {}
pub trait SendBufferWriter:
BufferWriter<Reader: SendBufferReader, notify_finished(..): Send> + Send
{
}
pub trait BufferReader: Any {
fn as_any_mut(&mut self) -> &mut dyn Any;
fn init(&mut self, block_id: BlockId, port_id: PortId, inbox: BlockInbox);
fn validate(&self) -> Result<(), Error>;
fn notify_finished(&mut self) -> impl Future<Output = ()>
where
Self: Sized;
fn finish(&mut self);
fn finished(&self) -> bool;
fn block_id(&self) -> BlockId;
fn port_id(&self) -> PortId;
}
impl<T> SendBufferReader for T where T: BufferReader<notify_finished(..): Send> + Send + 'static {}
pub trait BufferWriter {
type Reader: BufferReader;
fn init(&mut self, block_id: BlockId, port_id: PortId, inbox: BlockInbox);
fn validate(&self) -> Result<(), Error>;
fn connect(&mut self, dest: &mut Self::Reader);
fn connect_dyn(&mut self, dest: &mut dyn BufferReader) -> Result<(), Error> {
if let Some(concrete) = dest.as_any_mut().downcast_mut::<Self::Reader>() {
self.connect(concrete);
Ok(())
} else {
Err(Error::ValidationError(
"dyn BufferReader has wrong type".to_string(),
))
}
}
fn notify_finished(&mut self) -> impl Future<Output = ()>;
fn block_id(&self) -> BlockId;
fn port_id(&self) -> PortId;
}
impl<T> SendBufferWriter for T
where
T: BufferWriter<notify_finished(..): Send> + Send,
T::Reader: SendBufferReader,
{
}
pub trait CircuitWriter: BufferWriter {
type CircuitEnd;
fn close_circuit(&mut self, dst: &mut Self::CircuitEnd);
}
pub trait SendCircuitWriter: CircuitWriter + SendBufferWriter {}
impl<T> SendCircuitWriter for T where T: CircuitWriter + SendBufferWriter {}
pub trait CpuSample: Default + Clone + std::fmt::Debug + Send + Sync + 'static {}
impl<T> CpuSample for T where T: Default + Clone + std::fmt::Debug + Send + Sync + 'static {}
pub trait SendCpuBufferReader: CpuBufferReader + SendBufferReader {}
pub trait CpuBufferReader: BufferReader + Default {
type Item: CpuSample;
fn slice_with_tags(&mut self) -> (&[Self::Item], &Vec<ItemTag>);
fn slice(&mut self) -> &[Self::Item] {
self.slice_with_tags().0
}
fn consume(&mut self, n: usize);
fn set_min_items(&mut self, n: usize);
fn set_min_buffer_size_in_items(&mut self, n: usize);
fn max_items(&self) -> usize;
}
impl<T> SendCpuBufferReader for T where T: CpuBufferReader + SendBufferReader {}
pub trait SendCpuBufferWriter: CpuBufferWriter + SendBufferWriter {}
pub trait CpuBufferWriter: BufferWriter + Default {
type Item: CpuSample;
fn slice_with_tags(&mut self) -> (&mut [Self::Item], Tags<'_>);
fn slice(&mut self) -> &mut [Self::Item] {
self.slice_with_tags().0
}
fn produce(&mut self, n: usize);
fn set_min_items(&mut self, n: usize);
fn set_min_buffer_size_in_items(&mut self, n: usize);
fn max_items(&self) -> usize;
}
impl<T> SendCpuBufferWriter for T where T: CpuBufferWriter + SendBufferWriter {}
pub trait InplaceBuffer {
type Item: CpuSample;
fn set_valid(&mut self, valid: usize);
fn slice(&mut self) -> &mut [Self::Item];
fn slice_with_tags(&mut self) -> (&mut [Self::Item], &mut Vec<ItemTag>);
}
pub trait InplaceReader: BufferReader + Default {
type Item: CpuSample;
type Buffer: InplaceBuffer<Item = Self::Item>;
fn get_full_buffer(&mut self) -> Option<Self::Buffer>;
fn has_more_buffers(&mut self) -> bool;
fn put_empty_buffer(&mut self, buffer: Self::Buffer);
fn notify_consumed_buffer(&mut self);
}
pub trait InplaceWriter: BufferWriter + Default {
type Item: CpuSample;
type Buffer: InplaceBuffer<Item = Self::Item>;
fn put_full_buffer(&mut self, buffer: Self::Buffer);
fn get_empty_buffer(&mut self) -> Option<Self::Buffer>;
fn has_more_buffers(&mut self) -> bool;
fn inject_buffers(&mut self, n_buffers: usize) {
let n_items =
futuresdr::runtime::config::config().buffer_size / std::mem::size_of::<Self::Item>();
self.inject_buffers_with_items(n_buffers, n_items);
}
fn inject_buffers_with_items(&mut self, n_buffers: usize, n_items: usize);
}
pub trait SendInplaceReader: InplaceReader + SendBufferReader {}
impl<T> SendInplaceReader for T where T: InplaceReader + SendBufferReader {}
pub trait SendInplaceWriter: InplaceWriter + SendBufferWriter {}
impl<T> SendInplaceWriter for T where T: InplaceWriter + SendBufferWriter {}
#[cfg(not(target_arch = "wasm32"))]
pub type DefaultCpuReader<D> = circular::Reader<D>;
#[cfg(not(target_arch = "wasm32"))]
pub type DefaultCpuWriter<D> = circular::Writer<D>;
#[cfg(target_arch = "wasm32")]
pub type DefaultCpuReader<D> = slab::Reader<D>;
#[cfg(target_arch = "wasm32")]
pub type DefaultCpuWriter<D> = slab::Writer<D>;
pub type LocalCpuReader<D> = local::Reader<D>;
pub type LocalCpuWriter<D> = local::Writer<D>;
pub struct Tags<'a> {
tags: &'a mut Vec<ItemTag>,
offset: usize,
}
impl<'a> Tags<'a> {
pub fn new(tags: &'a mut Vec<ItemTag>, offset: usize) -> Self {
Self { tags, offset }
}
pub fn add_tag(&mut self, index: usize, tag: Tag) {
self.tags.push(ItemTag {
index: index + self.offset,
tag,
});
}
}