use std::{fmt, sync::Arc, time::Duration};
use zng_time::{Deadline, INSTANT};
mod ipc;
pub use ipc::{IpcReceiver, IpcSender, IpcValue, NamedIpcReceiver, NamedIpcSender, ipc_unbounded};
mod ipc_bytes;
pub use ipc_bytes::{
IpcBytes, IpcBytesCast, IpcBytesCastIntoIter, IpcBytesIntoIter, IpcBytesMut, IpcBytesMutCast, IpcBytesWriter, IpcBytesWriterBlocking,
WeakIpcBytes, cleanup_memmap_storage,
};
#[cfg(ipc)]
pub use ipc_bytes::{is_ipc_serialization, with_ipc_serialization};
use zng_txt::ToTxt;
pub struct Sender<T>(flume::Sender<T>);
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Sender<{}>", pretty_type_name::pretty_type_name::<T>())
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Sender(self.0.clone())
}
}
impl<T> From<flume::Sender<T>> for Sender<T> {
fn from(s: flume::Sender<T>) -> Self {
Sender(s)
}
}
impl<T> From<Sender<T>> for flume::Sender<T> {
fn from(s: Sender<T>) -> Self {
s.0
}
}
impl<T> Sender<T> {
pub async fn send(&self, msg: T) -> Result<(), ChannelError> {
self.0.send_async(msg).await?;
Ok(())
}
pub async fn send_deadline(&self, msg: T, deadline: impl Into<Deadline>) -> Result<(), ChannelError> {
match super::with_deadline(self.send(msg), deadline).await {
Ok(r) => match r {
Ok(_) => Ok(()),
Err(e) => Err(e),
},
Err(_) => Err(ChannelError::Timeout),
}
}
pub fn send_blocking(&self, msg: T) -> Result<(), ChannelError> {
self.0.send(msg)?;
Ok(())
}
pub fn send_deadline_blocking(&self, msg: T, deadline: impl Into<Deadline>) -> Result<(), ChannelError> {
super::block_on(self.send_deadline(msg, deadline))
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
}
pub struct Receiver<T>(flume::Receiver<T>);
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Receiver<{}>", pretty_type_name::pretty_type_name::<T>())
}
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
Receiver(self.0.clone())
}
}
impl<T> Receiver<T> {
pub async fn recv(&self) -> Result<T, ChannelError> {
let r = self.0.recv_async().await?;
Ok(r)
}
pub async fn recv_deadline(&self, deadline: impl Into<Deadline>) -> Result<T, ChannelError> {
match super::with_deadline(self.recv(), deadline).await {
Ok(r) => match r {
Ok(m) => Ok(m),
e => e,
},
Err(_) => Err(ChannelError::Timeout),
}
}
pub fn recv_blocking(&self) -> Result<T, ChannelError> {
let r = self.0.recv()?;
Ok(r)
}
pub fn recv_deadline_blocking(&self, deadline: impl Into<Deadline>) -> Result<T, ChannelError> {
self.recv_deadline_blocking_impl(deadline.into())
}
fn recv_deadline_blocking_impl(&self, deadline: Deadline) -> Result<T, ChannelError> {
const WORST_SLEEP_ERR: Duration = Duration::from_millis(if cfg!(windows) { 20 } else { 10 });
const WORST_SPIN_ERR: Duration = Duration::from_millis(if cfg!(windows) { 2 } else { 1 });
loop {
if let Some(d) = deadline.0.checked_duration_since(INSTANT.now()) {
if matches!(INSTANT.mode(), zng_time::InstantMode::Manual) {
match self.0.recv_timeout(d.checked_sub(WORST_SLEEP_ERR).unwrap_or_default()) {
Err(flume::RecvTimeoutError::Timeout) => continue, interrupt => return interrupt.map_err(ChannelError::from),
}
} else if d > WORST_SLEEP_ERR {
#[cfg(not(target_arch = "wasm32"))]
match self.0.recv_deadline(deadline.0.checked_sub(WORST_SLEEP_ERR).unwrap().into()) {
Err(flume::RecvTimeoutError::Timeout) => continue, interrupt => return interrupt.map_err(ChannelError::from),
}
#[cfg(target_arch = "wasm32")] match self.0.recv_timeout(d.checked_sub(WORST_SLEEP_ERR).unwrap_or_default()) {
Err(flume::RecvTimeoutError::Timeout) => continue, interrupt => return interrupt.map_err(ChannelError::from),
}
} else if d > WORST_SPIN_ERR {
let spin_deadline = Deadline(deadline.0.checked_sub(WORST_SPIN_ERR).unwrap());
while !spin_deadline.has_elapsed() {
match self.0.try_recv() {
Err(flume::TryRecvError::Empty) => std::thread::yield_now(),
interrupt => return interrupt.map_err(ChannelError::from),
}
}
continue; } else {
while !deadline.has_elapsed() {
std::thread::yield_now();
}
return Err(ChannelError::Timeout);
}
} else {
return Err(ChannelError::Timeout);
}
}
}
pub fn try_recv(&self) -> Result<Option<T>, ChannelError> {
match self.0.try_recv() {
Ok(r) => Ok(Some(r)),
Err(e) => match e {
flume::TryRecvError::Empty => Ok(None),
flume::TryRecvError::Disconnected => Err(ChannelError::disconnected()),
},
}
}
pub fn iter(&self) -> impl Iterator<Item = T> {
self.0.iter()
}
pub fn try_iter(&self) -> impl Iterator<Item = T> {
self.0.try_iter()
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
}
pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
let (s, r) = flume::unbounded();
(Sender(s), Receiver(r))
}
pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
let (s, r) = flume::bounded(capacity);
(Sender(s), Receiver(r))
}
pub fn rendezvous<T>() -> (Sender<T>, Receiver<T>) {
bounded::<T>(0)
}
#[derive(Debug, Clone)]
pub enum ChannelError {
Disconnected {
cause: Option<Arc<dyn std::error::Error + Send + Sync + 'static>>,
},
Timeout,
}
impl ChannelError {
pub fn disconnected() -> Self {
ChannelError::Disconnected { cause: None }
}
pub fn disconnected_by(cause: impl std::error::Error + Send + Sync + 'static) -> Self {
ChannelError::Disconnected {
cause: Some(Arc::new(cause)),
}
}
}
impl fmt::Display for ChannelError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ChannelError::Disconnected { cause: source } => match source {
Some(e) => write!(f, "channel disconnected due to, {e}"),
None => write!(f, "channel disconnected"),
},
ChannelError::Timeout => write!(f, "deadline elapsed before message could be transferred"),
}
}
}
impl std::error::Error for ChannelError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
if let Self::Disconnected { cause: Some(e) } = self {
Some(e)
} else {
None
}
}
}
impl PartialEq for ChannelError {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::Disconnected { cause: l_cause }, Self::Disconnected { cause: r_cause }) => match (l_cause, r_cause) {
(None, None) => true,
(Some(a), Some(b)) => a.to_txt() == b.to_txt(),
_ => false,
},
_ => core::mem::discriminant(self) == core::mem::discriminant(other),
}
}
}
impl Eq for ChannelError {}
impl From<flume::RecvError> for ChannelError {
fn from(value: flume::RecvError) -> Self {
match value {
flume::RecvError::Disconnected => ChannelError::disconnected(),
}
}
}
impl From<flume::RecvTimeoutError> for ChannelError {
fn from(value: flume::RecvTimeoutError) -> Self {
match value {
flume::RecvTimeoutError::Timeout => ChannelError::Timeout,
flume::RecvTimeoutError::Disconnected => ChannelError::disconnected(),
}
}
}
impl<T> From<flume::SendError<T>> for ChannelError {
fn from(_: flume::SendError<T>) -> Self {
ChannelError::disconnected()
}
}
impl From<flume::TryRecvError> for ChannelError {
fn from(value: flume::TryRecvError) -> Self {
match value {
flume::TryRecvError::Empty => ChannelError::Timeout,
flume::TryRecvError::Disconnected => ChannelError::disconnected(),
}
}
}