use futuresdr_types::PmtConversionError;
use std::fmt;
use std::fmt::Display;
use std::fmt::Formatter;
use thiserror::Error;
use crate::runtime::channel::mpsc;
use crate::runtime::channel::oneshot;
mod block;
mod block_inbox;
mod block_meta;
pub mod buffer;
pub mod channel;
pub mod config;
mod connect_add;
pub mod dev;
#[cfg(not(target_arch = "wasm32"))]
mod ctrl_port;
#[cfg(not(target_arch = "wasm32"))]
use crate::runtime::ctrl_port::ControlPort;
#[cfg(all(not(target_arch = "wasm32"), not(target_os = "android")))]
mod logging;
#[cfg(target_os = "android")]
#[path = "logging_android.rs"]
mod logging;
#[cfg(target_arch = "wasm32")]
#[path = "logging_wasm.rs"]
mod logging;
mod flowgraph;
mod flowgraph_handle;
mod flowgraph_task;
mod kernel;
mod kernel_interface;
#[cfg(not(target_arch = "wasm32"))]
mod local_domain;
#[cfg(target_arch = "wasm32")]
#[path = "local_domain_wasm.rs"]
mod local_domain;
mod local_domain_common;
mod message_output;
#[cfg(not(target_arch = "wasm32"))]
pub mod mocker;
mod running_flowgraph;
#[allow(clippy::module_inception)]
mod runtime;
pub mod scheduler;
mod tag;
mod timer;
mod work_io;
mod wrapped_kernel;
pub mod macros {
pub use futuresdr_macros::Block;
pub use futuresdr_macros::connect;
pub use futuresdr_macros::connect_async;
}
pub use flowgraph::BlockRef;
pub use flowgraph::Flowgraph;
pub use flowgraph::LocalDomain;
pub use flowgraph::LocalDomainContext;
pub use flowgraph_handle::FlowgraphBlockHandle;
pub use flowgraph_handle::FlowgraphHandle;
pub use flowgraph_task::FlowgraphTask;
pub use running_flowgraph::RunningFlowgraph;
pub use runtime::DefaultScheduler;
pub use runtime::Runtime;
pub use runtime::RuntimeHandle;
pub use timer::Timer;
pub use futuresdr_types::BlockDescription;
pub use futuresdr_types::BlockId;
pub use futuresdr_types::FlowgraphDescription;
pub use futuresdr_types::FlowgraphId;
pub use futuresdr_types::Pmt;
pub use futuresdr_types::PmtKind;
pub use futuresdr_types::PortId;
#[derive(Debug, Clone)]
pub(crate) struct Edge {
pub(crate) src_block: BlockId,
pub(crate) src_port: PortId,
pub(crate) dst_block: BlockId,
pub(crate) dst_port: PortId,
}
impl Edge {
pub(crate) fn new(
src_block: BlockId,
src_port: PortId,
dst_block: BlockId,
dst_port: PortId,
) -> Self {
Self {
src_block,
src_port,
dst_block,
dst_port,
}
}
pub(crate) fn endpoints(&self) -> (BlockId, PortId, BlockId, PortId) {
(
self.src_block,
self.src_port.clone(),
self.dst_block,
self.dst_port.clone(),
)
}
}
#[cfg(not(target_arch = "wasm32"))]
pub fn block_on<T>(future: impl std::future::Future<Output = T>) -> T {
async_io::block_on(future)
}
#[doc(hidden)]
pub mod __private {
pub use super::connect_add::ConnectAdd;
pub use super::connect_add::ConnectAddAsync;
pub use super::kernel_interface::KernelInterface;
pub use super::kernel_interface::SendKernelInterface;
}
pub type Result<T, E = anyhow::Error> = anyhow::Result<T, E>;
#[cfg(target_arch = "wasm32")]
pub(crate) fn yield_now() -> impl std::future::Future<Output = ()> + Unpin {
WasmYieldNow(false)
}
#[cfg(target_arch = "wasm32")]
pub(crate) fn wasm_event_loop_yield() -> impl std::future::Future<Output = ()> + Send + Unpin {
WasmEventLoopYield(false)
}
#[cfg(target_arch = "wasm32")]
struct WasmEventLoopYield(bool);
#[cfg(target_arch = "wasm32")]
impl std::future::Future for WasmEventLoopYield {
type Output = ();
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
if !self.0 {
self.0 = true;
let waker = cx.waker().clone();
let callback = wasm_bindgen::closure::Closure::once_into_js(move || waker.wake());
let function = wasm_bindgen::JsCast::unchecked_ref::<js_sys::Function>(&callback);
if let Some(window) = web_sys::window() {
let _ = window.set_timeout_with_callback_and_timeout_and_arguments_0(function, 1);
} else if let Ok(scope) =
wasm_bindgen::JsCast::dyn_into::<web_sys::WorkerGlobalScope>(js_sys::global())
{
let _ = scope.set_timeout_with_callback_and_timeout_and_arguments_0(function, 1);
} else {
cx.waker().wake_by_ref();
}
std::task::Poll::Pending
} else {
std::task::Poll::Ready(())
}
}
}
#[cfg(target_arch = "wasm32")]
struct WasmYieldNow(bool);
#[cfg(target_arch = "wasm32")]
impl std::future::Future for WasmYieldNow {
type Output = ();
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
if !self.0 {
self.0 = true;
cx.waker().wake_by_ref();
std::task::Poll::Pending
} else {
std::task::Poll::Ready(())
}
}
}
pub fn init() {
#[cfg(target_arch = "wasm32")]
console_error_panic_hook::set_once();
logging::init();
}
#[doc(hidden)]
#[derive(Debug)]
pub enum FlowgraphMessage {
Terminate,
Initialized,
BlockDone {
block_id: BlockId,
},
BlockError {
block_id: BlockId,
},
BlockCall {
block_id: BlockId,
port_id: PortId,
data: Pmt,
tx: oneshot::Sender<Result<(), Error>>,
},
BlockCallback {
block_id: BlockId,
port_id: PortId,
data: Pmt,
tx: oneshot::Sender<Result<Pmt, Error>>,
},
FlowgraphDescription {
tx: oneshot::Sender<FlowgraphDescription>,
},
BlockDescription {
block_id: BlockId,
tx: oneshot::Sender<Result<BlockDescription, Error>>,
},
}
#[doc(hidden)]
#[allow(dead_code)]
#[derive(Debug)]
pub(crate) enum BlockMessage {
Initialize,
Terminate,
BlockDescription {
tx: oneshot::Sender<BlockDescription>,
},
StreamInputDone {
input_id: PortId,
},
StreamOutputDone {
output_id: PortId,
},
Call {
port_id: PortId,
data: Pmt,
},
Callback {
port_id: PortId,
data: Pmt,
tx: oneshot::Sender<Result<Pmt, Error>>,
},
}
#[derive(Error, Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum Error {
#[error("Block {:?} does not exist", 0)]
InvalidBlock(BlockId),
#[error("Flowgraph terminated")]
FlowgraphTerminated,
#[error("Block '{0}' does not have message port '{1:?}'")]
InvalidMessagePort(BlockPortCtx, PortId),
#[error("Block '{0}' does not have stream port '{1:?}'")]
InvalidStreamPort(BlockPortCtx, PortId),
#[error("Invalid Parameter")]
InvalidParameter,
#[error("Error in message handler: {0}")]
HandlerError(String),
#[error("Block already terminated")]
BlockTerminated,
#[error("Runtime error ({0})")]
RuntimeError(String),
#[error("Validation error {0}")]
ValidationError(String),
#[error("PMT conversion error")]
PmtConversionError,
#[error("A Block with an instance name of '{0}' already exists")]
DuplicateBlockName(String),
#[error("Error while locking a Mutex that should not be contended or poisoned")]
LockError,
#[cfg(feature = "seify")]
#[error("Seify Args conversion error")]
SeifyArgsConversionError,
#[cfg(feature = "seify")]
#[error("Seify error ({0})")]
SeifyError(String),
}
#[cfg(feature = "seify")]
impl From<seify::Error> for Error {
fn from(value: seify::Error) -> Self {
Error::SeifyError(value.to_string())
}
}
impl From<oneshot::Canceled> for Error {
fn from(_value: oneshot::Canceled) -> Self {
Error::RuntimeError(
"Couldn't receive from oneshot channel, sender dropped unexpectedly".to_string(),
)
}
}
impl From<mpsc::SendError> for Error {
fn from(_value: mpsc::SendError) -> Self {
Error::RuntimeError(
"Couldn't send to mpsc channel, receiver dropped unexpectedly".to_string(),
)
}
}
impl<T> From<mpsc::TrySendError<T>> for Error {
fn from(_value: mpsc::TrySendError<T>) -> Self {
let message = match _value {
mpsc::TrySendError::Full(_) => "Couldn't send to mpsc channel, channel is full",
mpsc::TrySendError::Disconnected(_) => {
"Couldn't send to mpsc channel, receiver dropped unexpectedly"
}
};
Error::RuntimeError(message.to_string())
}
}
impl From<PmtConversionError> for Error {
fn from(_value: PmtConversionError) -> Self {
Error::PmtConversionError
}
}
#[doc(hidden)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BlockPortCtx {
None,
Id(BlockId),
Name(String),
}
#[cfg(not(target_arch = "wasm32"))]
impl From<&dyn crate::runtime::dev::Block> for BlockPortCtx {
fn from(value: &dyn crate::runtime::dev::Block) -> Self {
BlockPortCtx::Name(value.type_name().into())
}
}
impl Display for BlockPortCtx {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
BlockPortCtx::None => write!(f, "<None>"),
BlockPortCtx::Id(id) => write!(f, "{id:?}"),
BlockPortCtx::Name(name) => write!(f, "{name}"),
}
}
}