#![doc = include_str!("../README.md")]
#![warn(missing_docs, missing_debug_implementations)]
pub(crate) mod backoff;
pub(crate) mod internal;
#[cfg(not(feature = "std-mutex"))]
pub(crate) mod mutex;
pub(crate) mod pointer;
mod error;
#[cfg(feature = "async")]
mod future;
mod signal;
pub use error::*;
#[cfg(feature = "async")]
pub use future::*;
#[cfg(feature = "async")]
use core::mem::transmute;
use core::{
fmt,
mem::{needs_drop, size_of, MaybeUninit},
time::Duration,
};
use std::time::Instant;
use internal::{acquire_internal, try_acquire_internal, ChannelInternal, Internal};
use pointer::KanalPtr;
use signal::*;
#[cfg_attr(
feature = "async",
doc = r##"
# Examples
```
let (sender, _r) = kanal::bounded::<u64>(0);
let sync_sender=sender.clone_async();
```
"##
)]
#[repr(C)]
pub struct Sender<T> {
internal: Internal<T>,
}
#[cfg(feature = "async")]
#[repr(C)]
pub struct AsyncSender<T> {
internal: Internal<T>,
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let mut internal = acquire_internal(&self.internal);
if internal.send_count > 0 {
internal.send_count -= 1;
if internal.send_count == 0 && internal.recv_count != 0 {
internal.terminate_signals();
}
}
}
}
#[cfg(feature = "async")]
impl<T> Drop for AsyncSender<T> {
fn drop(&mut self) {
let mut internal = acquire_internal(&self.internal);
if internal.send_count > 0 {
internal.send_count -= 1;
if internal.send_count == 0 && internal.recv_count != 0 {
internal.terminate_signals();
}
}
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
let mut internal = acquire_internal(&self.internal);
if internal.send_count > 0 {
internal.send_count += 1;
}
drop(internal);
Self {
internal: self.internal.clone(),
}
}
}
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Sender {{ .. }}")
}
}
#[cfg(feature = "async")]
impl<T> Clone for AsyncSender<T> {
fn clone(&self) -> Self {
let mut internal = acquire_internal(&self.internal);
if internal.send_count > 0 {
internal.send_count += 1;
}
drop(internal);
Self {
internal: self.internal.clone(),
}
}
}
#[cfg(feature = "async")]
impl<T> fmt::Debug for AsyncSender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "AsyncSender {{ .. }}")
}
}
macro_rules! shared_impl {
() => {
pub fn is_bounded(&self) -> bool {
acquire_internal(&self.internal).capacity != usize::MAX
}
pub fn len(&self) -> usize {
acquire_internal(&self.internal).queue.len()
}
pub fn is_empty(&self) -> bool {
acquire_internal(&self.internal).queue.is_empty()
}
pub fn is_full(&self) -> bool {
let internal = acquire_internal(&self.internal);
internal.capacity == internal.queue.len()
}
pub fn capacity(&self) -> usize {
acquire_internal(&self.internal).capacity
}
pub fn receiver_count(&self) -> u32 {
acquire_internal(&self.internal).recv_count
}
pub fn sender_count(&self) -> u32 {
acquire_internal(&self.internal).send_count
}
pub fn close(&self) -> Result<(), CloseError> {
let mut internal = acquire_internal(&self.internal);
if internal.recv_count == 0 && internal.send_count == 0 {
return Err(CloseError());
}
internal.recv_count = 0;
internal.send_count = 0;
internal.terminate_signals();
internal.queue.clear();
Ok(())
}
pub fn is_closed(&self) -> bool {
let internal = acquire_internal(&self.internal);
internal.send_count == 0 && internal.recv_count == 0
}
};
}
macro_rules! shared_send_impl {
() => {
#[inline(always)]
pub fn try_send(&self, data: T) -> Result<bool, SendError> {
let mut internal = acquire_internal(&self.internal);
if internal.recv_count == 0 {
let send_count = internal.send_count;
drop(internal);
if send_count == 0 {
return Err(SendError::Closed);
}
return Err(SendError::ReceiveClosed);
}
if let Some(first) = internal.next_recv() {
drop(internal);
unsafe { first.send(data) }
return Ok(true);
} else if internal.queue.len() < internal.capacity {
internal.queue.push_back(data);
return Ok(true);
}
Ok(false)
}
#[inline(always)]
pub fn try_send_option(&self, data: &mut Option<T>) -> Result<bool, SendError> {
if data.is_none() {
panic!("send data option is None");
}
let mut internal = acquire_internal(&self.internal);
if internal.recv_count == 0 {
let send_count = internal.send_count;
drop(internal);
if send_count == 0 {
return Err(SendError::Closed);
}
return Err(SendError::ReceiveClosed);
}
if let Some(first) = internal.next_recv() {
drop(internal);
unsafe { first.send(data.take().unwrap()) }
return Ok(true);
} else if internal.queue.len() < internal.capacity {
internal.queue.push_back(data.take().unwrap());
return Ok(true);
}
Ok(false)
}
#[inline(always)]
pub fn try_send_realtime(&self, data: T) -> Result<bool, SendError> {
if let Some(mut internal) = try_acquire_internal(&self.internal) {
if internal.recv_count == 0 {
let send_count = internal.send_count;
drop(internal);
if send_count == 0 {
return Err(SendError::Closed);
}
return Err(SendError::ReceiveClosed);
}
if let Some(first) = internal.next_recv() {
drop(internal);
unsafe { first.send(data) }
return Ok(true);
} else if internal.queue.len() < internal.capacity {
internal.queue.push_back(data);
return Ok(true);
}
}
Ok(false)
}
#[inline(always)]
pub fn try_send_option_realtime(&self, data: &mut Option<T>) -> Result<bool, SendError> {
if data.is_none() {
panic!("send data option is None");
}
if let Some(mut internal) = try_acquire_internal(&self.internal) {
if internal.recv_count == 0 {
let send_count = internal.send_count;
drop(internal);
if send_count == 0 {
return Err(SendError::Closed);
}
return Err(SendError::ReceiveClosed);
}
if let Some(first) = internal.next_recv() {
drop(internal);
unsafe { first.send(data.take().unwrap()) }
return Ok(true);
} else if internal.queue.len() < internal.capacity {
internal.queue.push_back(data.take().unwrap());
return Ok(true);
}
}
Ok(false)
}
pub fn is_disconnected(&self) -> bool {
acquire_internal(&self.internal).recv_count == 0
}
};
}
macro_rules! shared_recv_impl {
() => {
#[inline(always)]
pub fn try_recv(&self) -> Result<Option<T>, ReceiveError> {
let mut internal = acquire_internal(&self.internal);
if internal.recv_count == 0 {
return Err(ReceiveError::Closed);
}
if let Some(v) = internal.queue.pop_front() {
if let Some(p) = internal.next_send() {
unsafe { internal.queue.push_back(p.recv()) }
}
return Ok(Some(v));
} else if let Some(p) = internal.next_send() {
drop(internal);
return unsafe { Ok(Some(p.recv())) };
}
if internal.send_count == 0 {
return Err(ReceiveError::SendClosed);
}
Ok(None)
}
#[inline(always)]
pub fn try_recv_realtime(&self) -> Result<Option<T>, ReceiveError> {
if let Some(mut internal) = try_acquire_internal(&self.internal) {
if internal.recv_count == 0 {
return Err(ReceiveError::Closed);
}
if let Some(v) = internal.queue.pop_front() {
if let Some(p) = internal.next_send() {
unsafe { internal.queue.push_back(p.recv()) }
}
return Ok(Some(v));
} else if let Some(p) = internal.next_send() {
drop(internal);
return unsafe { Ok(Some(p.recv())) };
}
if internal.send_count == 0 {
return Err(ReceiveError::SendClosed);
}
}
Ok(None)
}
pub fn drain_into(&self, vec: &mut Vec<T>) -> Result<usize, ReceiveError> {
let vec_initial_length = vec.len();
let remaining_cap = vec.capacity() - vec_initial_length;
let mut internal = acquire_internal(&self.internal);
if internal.recv_count == 0 {
return Err(ReceiveError::Closed);
}
let required_cap = internal.queue.len() + {
if internal.recv_blocking {
0
} else {
internal.wait_list.len()
}
};
if required_cap > remaining_cap {
vec.reserve(vec_initial_length + required_cap - remaining_cap);
}
while let Some(v) = internal.queue.pop_front() {
vec.push(v);
}
while let Some(p) = internal.next_send() {
unsafe { vec.push(p.recv()) }
}
Ok(required_cap)
}
pub fn is_disconnected(&self) -> bool {
acquire_internal(&self.internal).send_count == 0
}
pub fn is_terminated(&self) -> bool {
let internal = acquire_internal(&self.internal);
internal.send_count == 0 && internal.queue.len() == 0
}
};
}
impl<T> Sender<T> {
#[inline(always)]
pub fn send(&self, data: T) -> Result<(), SendError> {
let mut internal = acquire_internal(&self.internal);
if internal.recv_count == 0 {
let send_count = internal.send_count;
drop(internal);
if send_count == 0 {
return Err(SendError::Closed);
}
return Err(SendError::ReceiveClosed);
}
if let Some(first) = internal.next_recv() {
drop(internal);
unsafe { first.send(data) }
Ok(())
} else if internal.queue.len() < internal.capacity {
internal.queue.push_back(data);
Ok(())
} else {
let mut data = MaybeUninit::new(data);
let sig = Signal::new_sync(KanalPtr::new_from(data.as_mut_ptr()));
internal.push_send(sig.get_terminator());
drop(internal);
if !sig.wait() {
if needs_drop::<T>() {
unsafe { data.assume_init_drop() }
}
return Err(SendError::Closed);
}
Ok(())
}
}
#[inline(always)]
pub fn send_timeout(&self, data: T, duration: Duration) -> Result<(), SendErrorTimeout> {
let deadline = Instant::now().checked_add(duration).unwrap();
let mut internal = acquire_internal(&self.internal);
if internal.recv_count == 0 {
let send_count = internal.send_count;
drop(internal);
if send_count == 0 {
return Err(SendErrorTimeout::Closed);
}
return Err(SendErrorTimeout::ReceiveClosed);
}
if let Some(first) = internal.next_recv() {
drop(internal);
unsafe { first.send(data) }
Ok(())
} else if internal.queue.len() < internal.capacity {
internal.queue.push_back(data);
Ok(())
} else {
let mut data = MaybeUninit::new(data);
let sig = Signal::new_sync(KanalPtr::new_from(data.as_mut_ptr()));
internal.push_send(sig.get_terminator());
drop(internal);
if !sig.wait_timeout(deadline) {
if sig.is_terminated() {
if needs_drop::<T>() {
unsafe { data.assume_init_drop() }
}
return Err(SendErrorTimeout::Closed);
}
{
let mut internal = acquire_internal(&self.internal);
if internal.cancel_send_signal(&sig) {
return Err(SendErrorTimeout::Timeout);
}
}
if !sig.wait() {
if needs_drop::<T>() {
unsafe { data.assume_init_drop() }
}
return Err(SendErrorTimeout::Closed);
}
}
Ok(())
}
}
#[inline(always)]
pub fn send_option_timeout(
&self,
data: &mut Option<T>,
duration: Duration,
) -> Result<(), SendErrorTimeout> {
if data.is_none() {
panic!("send data option is None");
}
let deadline = Instant::now().checked_add(duration).unwrap();
let mut internal = acquire_internal(&self.internal);
if internal.recv_count == 0 {
let send_count = internal.send_count;
drop(internal);
if send_count == 0 {
return Err(SendErrorTimeout::Closed);
}
return Err(SendErrorTimeout::ReceiveClosed);
}
if let Some(first) = internal.next_recv() {
drop(internal);
unsafe { first.send(data.take().unwrap()) }
Ok(())
} else if internal.queue.len() < internal.capacity {
internal.queue.push_back(data.take().unwrap());
Ok(())
} else {
let mut d = data.take().unwrap();
let sig = Signal::new_sync(KanalPtr::new_from(&mut d));
internal.push_send(sig.get_terminator());
drop(internal);
if !sig.wait_timeout(deadline) {
if sig.is_terminated() {
*data = Some(d);
return Err(SendErrorTimeout::Closed);
}
{
let mut internal = acquire_internal(&self.internal);
if internal.cancel_send_signal(&sig) {
*data = Some(d);
return Err(SendErrorTimeout::Timeout);
}
}
if !sig.wait() {
*data = Some(d);
return Err(SendErrorTimeout::Closed);
}
}
Ok(())
}
}
shared_send_impl!();
#[cfg(feature = "async")]
pub fn clone_async(&self) -> AsyncSender<T> {
let mut internal = acquire_internal(&self.internal);
if internal.send_count > 0 {
internal.send_count += 1;
}
drop(internal);
AsyncSender::<T> {
internal: self.internal.clone(),
}
}
#[cfg(feature = "async")]
pub fn to_async(self) -> AsyncSender<T> {
unsafe { transmute(self) }
}
#[cfg(feature = "async")]
pub fn as_async(&self) -> &AsyncSender<T> {
unsafe { transmute(self) }
}
shared_impl!();
}
#[cfg(feature = "async")]
impl<T> AsyncSender<T> {
#[inline(always)]
pub fn send(&'_ self, data: T) -> SendFuture<'_, T> {
SendFuture::new(&self.internal, data)
}
shared_send_impl!();
pub fn clone_sync(&self) -> Sender<T> {
let mut internal = acquire_internal(&self.internal);
if internal.send_count > 0 {
internal.send_count += 1;
}
drop(internal);
Sender::<T> {
internal: self.internal.clone(),
}
}
pub fn to_sync(self) -> Sender<T> {
unsafe { transmute(self) }
}
pub fn as_sync(&self) -> &Sender<T> {
unsafe { transmute(self) }
}
shared_impl!();
}
#[cfg_attr(
feature = "async",
doc = r##"
# Examples
```
let (_s, receiver) = kanal::bounded::<u64>(0);
let async_receiver=receiver.clone_async();
```
"##
)]
#[repr(C)]
pub struct Receiver<T> {
internal: Internal<T>,
}
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Receiver {{ .. }}")
}
}
#[cfg(feature = "async")]
#[repr(C)]
pub struct AsyncReceiver<T> {
internal: Internal<T>,
}
#[cfg(feature = "async")]
impl<T> fmt::Debug for AsyncReceiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "AsyncReceiver {{ .. }}")
}
}
impl<T> Receiver<T> {
#[inline(always)]
pub fn recv(&self) -> Result<T, ReceiveError> {
let mut internal = acquire_internal(&self.internal);
if internal.recv_count == 0 {
return Err(ReceiveError::Closed);
}
if let Some(v) = internal.queue.pop_front() {
if let Some(p) = internal.next_send() {
unsafe { internal.queue.push_back(p.recv()) }
}
Ok(v)
} else if let Some(p) = internal.next_send() {
drop(internal);
unsafe { Ok(p.recv()) }
} else {
if internal.send_count == 0 {
return Err(ReceiveError::SendClosed);
}
let mut ret = MaybeUninit::<T>::uninit();
let sig = Signal::new_sync(KanalPtr::new_write_address_ptr(ret.as_mut_ptr()));
internal.push_recv(sig.get_terminator());
drop(internal);
if !sig.wait() {
return Err(ReceiveError::Closed);
}
if size_of::<T>() > size_of::<*mut T>() {
Ok(unsafe { ret.assume_init() })
} else {
Ok(unsafe { sig.assume_init() })
}
}
}
#[inline(always)]
pub fn recv_timeout(&self, duration: Duration) -> Result<T, ReceiveErrorTimeout> {
let deadline = Instant::now().checked_add(duration).unwrap();
let mut internal = acquire_internal(&self.internal);
if internal.recv_count == 0 {
return Err(ReceiveErrorTimeout::Closed);
}
if let Some(v) = internal.queue.pop_front() {
if let Some(p) = internal.next_send() {
unsafe { internal.queue.push_back(p.recv()) }
}
Ok(v)
} else if let Some(p) = internal.next_send() {
drop(internal);
unsafe { Ok(p.recv()) }
} else {
if Instant::now() > deadline {
return Err(ReceiveErrorTimeout::Timeout);
}
if internal.send_count == 0 {
return Err(ReceiveErrorTimeout::SendClosed);
}
let mut ret = MaybeUninit::<T>::uninit();
let sig = Signal::new_sync(KanalPtr::new_write_address_ptr(ret.as_mut_ptr()));
internal.push_recv(sig.get_terminator());
drop(internal);
if !sig.wait_timeout(deadline) {
if sig.is_terminated() {
return Err(ReceiveErrorTimeout::Closed);
}
{
let mut internal = acquire_internal(&self.internal);
if internal.cancel_recv_signal(&sig) {
return Err(ReceiveErrorTimeout::Timeout);
}
}
if !sig.wait() {
return Err(ReceiveErrorTimeout::Closed);
}
}
if size_of::<T>() > size_of::<*mut T>() {
Ok(unsafe { ret.assume_init() })
} else {
Ok(unsafe { sig.assume_init() })
}
}
}
shared_recv_impl!();
#[cfg(feature = "async")]
pub fn clone_async(&self) -> AsyncReceiver<T> {
let mut internal = acquire_internal(&self.internal);
if internal.recv_count > 0 {
internal.recv_count += 1;
}
drop(internal);
AsyncReceiver::<T> {
internal: self.internal.clone(),
}
}
#[cfg(feature = "async")]
pub fn to_async(self) -> AsyncReceiver<T> {
unsafe { transmute(self) }
}
#[cfg(feature = "async")]
pub fn as_async(&self) -> &AsyncReceiver<T> {
unsafe { transmute(self) }
}
shared_impl!();
}
impl<T> Iterator for Receiver<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.recv().ok()
}
}
#[cfg(feature = "async")]
impl<T> AsyncReceiver<T> {
#[inline(always)]
pub fn recv(&'_ self) -> ReceiveFuture<'_, T> {
ReceiveFuture::new_ref(&self.internal)
}
#[inline(always)]
pub fn stream(&'_ self) -> ReceiveStream<'_, T> {
ReceiveStream::new_borrowed(self)
}
shared_recv_impl!();
pub fn clone_sync(&self) -> Receiver<T> {
let mut internal = acquire_internal(&self.internal);
if internal.recv_count > 0 {
internal.recv_count += 1;
}
drop(internal);
Receiver::<T> {
internal: self.internal.clone(),
}
}
pub fn to_sync(self) -> Receiver<T> {
unsafe { transmute(self) }
}
pub fn as_sync(&self) -> &Receiver<T> {
unsafe { transmute(self) }
}
shared_impl!();
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
let mut internal = acquire_internal(&self.internal);
if internal.recv_count > 0 {
internal.recv_count -= 1;
if internal.recv_count == 0 && internal.send_count != 0 {
internal.terminate_signals();
}
}
}
}
#[cfg(feature = "async")]
impl<T> Drop for AsyncReceiver<T> {
fn drop(&mut self) {
let mut internal = acquire_internal(&self.internal);
if internal.recv_count > 0 {
internal.recv_count -= 1;
if internal.recv_count == 0 && internal.send_count != 0 {
internal.terminate_signals();
}
}
}
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
let mut internal = acquire_internal(&self.internal);
if internal.recv_count > 0 {
internal.recv_count += 1;
}
drop(internal);
Self {
internal: self.internal.clone(),
}
}
}
#[cfg(feature = "async")]
impl<T> Clone for AsyncReceiver<T> {
fn clone(&self) -> Self {
let mut internal = acquire_internal(&self.internal);
if internal.recv_count > 0 {
internal.recv_count += 1;
}
drop(internal);
Self {
internal: self.internal.clone(),
}
}
}
pub fn bounded<T>(size: usize) -> (Sender<T>, Receiver<T>) {
let internal = ChannelInternal::new(true, size);
(
Sender {
internal: internal.clone(),
},
Receiver { internal },
)
}
#[cfg(feature = "async")]
pub fn bounded_async<T>(size: usize) -> (AsyncSender<T>, AsyncReceiver<T>) {
let internal = ChannelInternal::new(true, size);
(
AsyncSender {
internal: internal.clone(),
},
AsyncReceiver { internal },
)
}
const UNBOUNDED_STARTING_SIZE: usize = 32;
pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
let internal = ChannelInternal::new(false, UNBOUNDED_STARTING_SIZE);
(
Sender {
internal: internal.clone(),
},
Receiver { internal },
)
}
#[cfg(feature = "async")]
pub fn unbounded_async<T>() -> (AsyncSender<T>, AsyncReceiver<T>) {
let internal = ChannelInternal::new(false, UNBOUNDED_STARTING_SIZE);
(
AsyncSender {
internal: internal.clone(),
},
AsyncReceiver { internal },
)
}