use futuresdr::channel::mpsc;
use futuresdr::channel::oneshot;
use futuresdr_types::PmtConversionError;
use std::fmt;
use std::fmt::Display;
use std::fmt::Formatter;
use thiserror::Error;
mod block;
mod block_meta;
pub mod buffer;
pub mod config;
#[cfg(not(target_arch = "wasm32"))]
mod ctrl_port;
#[cfg(target_arch = "wasm32")]
#[path = "ctrl_port_wasm.rs"]
mod ctrl_port;
use crate::runtime::ctrl_port::ControlPort;
#[cfg(all(not(target_arch = "wasm32"), not(target_os = "android")))]
mod logging;
#[cfg(target_os = "android")]
#[path = "logging_android.rs"]
mod logging;
#[cfg(target_arch = "wasm32")]
#[path = "logging_wasm.rs"]
mod logging;
mod flowgraph;
mod flowgraph_handle;
mod kernel;
mod message_io;
#[cfg(not(target_arch = "wasm32"))]
pub mod mocker;
#[allow(clippy::module_inception)]
mod runtime;
pub mod scheduler;
mod tag;
mod work_io;
pub use block::Block;
pub use block::WrappedKernel;
pub use block_meta::BlockMeta;
pub use flowgraph::BlockRef;
pub use flowgraph::Flowgraph;
pub use flowgraph_handle::FlowgraphHandle;
pub use kernel::Kernel;
pub use kernel::KernelInterface;
pub use message_io::MessageOutput;
pub use message_io::MessageOutputs;
pub use runtime::Runtime;
pub use runtime::RuntimeHandle;
pub use tag::ItemTag;
pub use tag::Tag;
pub use work_io::WorkIo;
pub use futuresdr_types::BlockDescription;
pub use futuresdr_types::BlockId;
pub use futuresdr_types::FlowgraphDescription;
pub use futuresdr_types::FlowgraphId;
pub use futuresdr_types::Pmt;
pub use futuresdr_types::PmtKind;
pub use futuresdr_types::PortId;
use buffer::BufferReader;
use buffer::BufferWriter;
pub type Result<T, E = anyhow::Error> = anyhow::Result<T, E>;
pub fn init() {
logging::init();
}
#[derive(Debug)]
pub enum FlowgraphMessage {
Terminate,
Initialized,
BlockDone {
block_id: BlockId,
},
BlockError {
block_id: BlockId,
},
BlockCall {
block_id: BlockId,
port_id: PortId,
data: Pmt,
tx: oneshot::Sender<Result<(), Error>>,
},
BlockCallback {
block_id: BlockId,
port_id: PortId,
data: Pmt,
tx: oneshot::Sender<Result<Pmt, Error>>,
},
FlowgraphDescription {
tx: oneshot::Sender<FlowgraphDescription>,
},
BlockDescription {
block_id: BlockId,
tx: oneshot::Sender<Result<BlockDescription, Error>>,
},
}
#[derive(Debug)]
pub enum BlockMessage {
Initialize,
Terminate,
Notify,
BlockDescription {
tx: oneshot::Sender<BlockDescription>,
},
StreamInputDone {
input_id: PortId,
},
StreamOutputDone {
output_id: PortId,
},
Call {
port_id: PortId,
data: Pmt,
},
Callback {
port_id: PortId,
data: Pmt,
tx: oneshot::Sender<Result<Pmt, Error>>,
},
}
#[derive(Error, Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum Error {
#[error("Block {:?} does not exist", 0)]
InvalidBlock(BlockId),
#[error("Flowgraph terminated")]
FlowgraphTerminated,
#[error("Block '{0}' does not have message port '{1:?}'")]
InvalidMessagePort(BlockPortCtx, PortId),
#[error("Block '{0}' does not have stream port '{1:?}'")]
InvalidStreamPort(BlockPortCtx, PortId),
#[error("Invalid Parameter")]
InvalidParameter,
#[error("Error in message handler: {0}")]
HandlerError(String),
#[error("Block already terminated")]
BlockTerminated,
#[error("Runtime error ({0})")]
RuntimeError(String),
#[error("Validation error {0}")]
ValidationError(String),
#[error("PMT conversion error")]
PmtConversionError,
#[error("A Block with an instance name of '{0}' already exists")]
DuplicateBlockName(String),
#[error("Error while locking a Mutex that should not be contended or poisoned")]
LockError,
#[cfg(feature = "seify")]
#[error("Seify Args conversion error")]
SeifyArgsConversionError,
#[cfg(feature = "seify")]
#[error("Seify error ({0})")]
SeifyError(String),
}
#[cfg(feature = "seify")]
impl From<seify::Error> for Error {
fn from(value: seify::Error) -> Self {
Error::SeifyError(value.to_string())
}
}
impl From<oneshot::Canceled> for Error {
fn from(_value: oneshot::Canceled) -> Self {
Error::RuntimeError(
"Couldn't receive from oneshot channel, sender dropped unexpectedly".to_string(),
)
}
}
impl From<mpsc::SendError> for Error {
fn from(_value: mpsc::SendError) -> Self {
Error::RuntimeError(
"Couldn't send to mpsc channel, receiver dropped unexpectedly".to_string(),
)
}
}
impl<T> From<mpsc::TrySendError<T>> for Error {
fn from(_value: mpsc::TrySendError<T>) -> Self {
Error::RuntimeError(
"Couldn't send to mpsc channel, receiver dropped unexpectedly".to_string(),
)
}
}
impl From<PmtConversionError> for Error {
fn from(_value: PmtConversionError) -> Self {
Error::PmtConversionError
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BlockPortCtx {
None,
Id(BlockId),
Name(String),
}
impl From<&dyn Block> for BlockPortCtx {
fn from(value: &dyn Block) -> Self {
BlockPortCtx::Name(value.type_name().into())
}
}
impl Display for BlockPortCtx {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
BlockPortCtx::None => write!(f, "<None>"),
BlockPortCtx::Id(id) => write!(f, "{id:?}"),
BlockPortCtx::Name(name) => write!(f, "{name}"),
}
}
}
mod futures {
use std::future::Future;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
pub fn yield_now() -> YieldNow {
YieldNow(false)
}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct YieldNow(bool);
impl Future for YieldNow {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.0 {
self.0 = true;
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(())
}
}
}
}