#![doc = include_str!("../README.md")]
#![warn(missing_docs, missing_debug_implementations)]
pub(crate) mod backoff;
pub(crate) mod internal;
#[cfg(not(feature = "std-mutex"))]
pub(crate) mod mutex;
pub(crate) mod pointer;
mod error;
#[cfg(feature = "async")]
mod future;
mod signal;
pub use error::*;
#[cfg(feature = "async")]
pub use future::*;
#[cfg(feature = "async")]
use core::mem::transmute;
use core::{
fmt,
mem::{size_of, MaybeUninit},
pin::pin,
time::Duration,
};
use std::{collections::VecDeque, time::Instant};
use branches::unlikely;
use internal::{acquire_internal, try_acquire_internal, Internal};
use pointer::KanalPtr;
use signal::*;
#[cfg_attr(
feature = "async",
doc = r##"
# Examples
```
let (sender, _r) = kanal_plus::bounded::<u64>(0);
let sync_sender=sender.clone_async();
```
"##
)]
#[repr(C)]
pub struct Sender<T> {
internal: Internal<T>,
}
#[cfg(feature = "async")]
#[repr(C)]
pub struct AsyncSender<T> {
internal: Internal<T>,
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
self.internal.drop_send();
}
}
#[cfg(feature = "async")]
impl<T> Drop for AsyncSender<T> {
fn drop(&mut self) {
self.internal.drop_send();
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Self {
internal: self.internal.clone_send(),
}
}
}
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Sender {{ .. }}")
}
}
#[cfg(feature = "async")]
impl<T> Clone for AsyncSender<T> {
fn clone(&self) -> Self {
Self {
internal: self.internal.clone_send(),
}
}
}
#[cfg(feature = "async")]
impl<T> fmt::Debug for AsyncSender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "AsyncSender {{ .. }}")
}
}
macro_rules! check_recv_closed_timeout {
($internal:ident,$data:ident) => {
if unlikely($internal.recv_count == 0) {
drop($internal);
return Err(SendTimeoutError::Closed($data));
}
};
}
macro_rules! shared_impl {
() => {
pub fn is_bounded(&self) -> bool {
self.internal.capacity() != usize::MAX
}
pub fn len(&self) -> usize {
acquire_internal(&self.internal).queue.len()
}
pub fn is_empty(&self) -> bool {
acquire_internal(&self.internal).queue.is_empty()
}
pub fn is_full(&self) -> bool {
self.internal.capacity() == acquire_internal(&self.internal).queue.len()
}
pub fn capacity(&self) -> usize {
self.internal.capacity()
}
pub fn receiver_count(&self) -> usize {
acquire_internal(&self.internal).recv_count as usize
}
pub fn sender_count(&self) -> usize {
acquire_internal(&self.internal).send_count as usize
}
pub fn close(&self) -> Result<(), CloseError> {
let mut internal = acquire_internal(&self.internal);
if unlikely(internal.recv_count == 0 && internal.send_count == 0) {
return Err(CloseError());
}
internal.recv_count = 0;
internal.send_count = 0;
internal.terminate_signals();
internal.queue.clear();
Ok(())
}
pub fn is_closed(&self) -> bool {
let internal = acquire_internal(&self.internal);
internal.send_count == 0 && internal.recv_count == 0
}
};
}
macro_rules! shared_send_impl {
() => {
#[inline(always)]
pub fn try_send(&self, data: T) -> Result<(), SendTimeoutError<T>> {
let cap = self.internal.capacity();
let mut internal = acquire_internal(&self.internal);
check_recv_closed_timeout!(internal, data);
if let Some(first) = internal.next_recv() {
drop(internal);
unsafe { first.send(data) }
return Ok(());
}
if cap > 0 && internal.queue.len() < cap {
internal.queue.push_back(data);
return Ok(());
}
Err(SendTimeoutError::Timeout(data))
}
#[inline(always)]
pub fn try_send_realtime(&self, data: T) -> Result<(), SendTimeoutError<T>> {
let cap = self.internal.capacity();
if let Some(mut internal) = try_acquire_internal(&self.internal) {
check_recv_closed_timeout!(internal, data);
if let Some(first) = internal.next_recv() {
drop(internal);
unsafe { first.send(data) }
return Ok(());
}
if cap > 0 && internal.queue.len() < cap {
internal.queue.push_back(data);
return Ok(());
}
}
Err(SendTimeoutError::Timeout(data))
}
pub fn is_disconnected(&self) -> bool {
acquire_internal(&self.internal).recv_count == 0
}
};
}
macro_rules! shared_recv_impl {
() => {
#[inline(always)]
pub fn try_recv(&self) -> Result<Option<T>, ReceiveError> {
let cap = self.internal.capacity();
let mut internal = acquire_internal(&self.internal);
if unlikely(internal.recv_count == 0) {
return Err(ReceiveError());
}
if cap > 0 {
if let Some(v) = internal.queue.pop_front() {
if let Some(p) = internal.next_send() {
unsafe { internal.queue.push_back(p.recv()) }
}
return Ok(Some(v));
}
}
if let Some(p) = internal.next_send() {
drop(internal);
return unsafe { Ok(Some(p.recv())) };
}
if unlikely(internal.send_count == 0) {
return Err(ReceiveError());
}
Ok(None)
}
#[inline(always)]
pub fn try_recv_realtime(&self) -> Result<Option<T>, ReceiveError> {
let cap = self.internal.capacity();
if let Some(mut internal) = try_acquire_internal(&self.internal) {
if unlikely(internal.recv_count == 0) {
return Err(ReceiveError());
}
if cap > 0 {
if let Some(v) = internal.queue.pop_front() {
if let Some(p) = internal.next_send() {
unsafe { internal.queue.push_back(p.recv()) }
}
return Ok(Some(v));
}
}
if let Some(p) = internal.next_send() {
drop(internal);
return unsafe { Ok(Some(p.recv())) };
}
if unlikely(internal.send_count == 0) {
return Err(ReceiveError());
}
}
Ok(None)
}
pub fn drain_into(&self, vec: &mut Vec<T>) -> Result<usize, ReceiveError> {
let vec_initial_length = vec.len();
let remaining_cap = vec.capacity() - vec_initial_length;
let mut internal = acquire_internal(&self.internal);
if unlikely(internal.recv_count == 0) {
return Err(ReceiveError());
}
let required_cap = internal.queue.len() + {
if internal.recv_blocking {
0
} else {
internal.wait_list.len()
}
};
if required_cap > remaining_cap {
vec.reserve(vec_initial_length + required_cap - remaining_cap);
}
while let Some(v) = internal.queue.pop_front() {
vec.push(v);
}
while let Some(p) = internal.next_send() {
unsafe { vec.push(p.recv()) }
}
Ok(required_cap)
}
pub fn is_disconnected(&self) -> bool {
acquire_internal(&self.internal).send_count == 0
}
pub fn is_terminated(&self) -> bool {
let internal = acquire_internal(&self.internal);
internal.send_count == 0 && internal.queue.len() == 0
}
};
}
impl<T> Sender<T> {
#[inline(always)]
pub fn send(&self, data: T) -> Result<(), SendError<T>> {
let cap = self.internal.capacity();
let mut internal = acquire_internal(&self.internal);
if unlikely(internal.recv_count == 0) {
drop(internal);
return Err(SendError(data));
}
if let Some(first) = internal.next_recv() {
drop(internal);
unsafe { first.send(data) }
return Ok(());
}
if cap > 0 && internal.queue.len() < cap {
internal.queue.push_back(data);
return Ok(());
}
let mut data = MaybeUninit::new(data);
let sig = pin!(SyncSignal::new(KanalPtr::new_from(data.as_mut_ptr())));
internal.push_signal(sig.dynamic_ptr());
drop(internal);
if unlikely(!sig.wait()) {
return Err(SendError(unsafe { data.assume_init() }));
}
Ok(())
}
pub fn send_many(&self, elements: &mut VecDeque<T>) -> Result<(), SendError<T>> {
if unlikely(elements.is_empty()) {
return Ok(());
}
let cap = self.internal.capacity();
loop {
let mut internal = acquire_internal(&self.internal);
if unlikely(internal.recv_count == 0) {
drop(internal);
return Err(SendError(elements.pop_front().unwrap()));
}
while let Some(first) = internal.next_recv() {
unsafe {
first.send(elements.pop_front().unwrap());
}
if unlikely(elements.is_empty()) {
return Ok(());
}
}
if cap > 0 {
while internal.queue.len() < cap {
if let Some(v) = elements.pop_front() {
internal.queue.push_back(v);
} else {
return Ok(());
}
}
if unlikely(elements.is_empty()) {
return Ok(());
}
}
let mut data = MaybeUninit::new(elements.pop_front().unwrap());
let sig = pin!(SyncSignal::new(KanalPtr::new_from(data.as_mut_ptr())));
internal.recv_blocking = false;
internal.push_signal(sig.dynamic_ptr());
drop(internal);
if unlikely(!sig.wait()) {
return Err(SendError(unsafe { data.assume_init() }));
}
if unlikely(elements.is_empty()) {
return Ok(());
}
}
}
#[inline(always)]
pub fn send_timeout(&self, data: T, duration: Duration) -> Result<(), SendTimeoutError<T>> {
let cap = self.internal.capacity();
let deadline = Instant::now().checked_add(duration).unwrap();
let mut internal = acquire_internal(&self.internal);
if unlikely(internal.recv_count == 0) {
drop(internal);
return Err(SendTimeoutError::Closed(data));
}
if let Some(first) = internal.next_recv() {
drop(internal);
unsafe { first.send(data) }
return Ok(());
}
if cap > 0 && internal.queue.len() < cap {
internal.queue.push_back(data);
return Ok(());
}
let mut data = MaybeUninit::new(data);
let sig = pin!(SyncSignal::new(KanalPtr::new_from(data.as_mut_ptr())));
internal.push_signal(sig.dynamic_ptr());
drop(internal);
if unlikely(!sig.wait_timeout(deadline)) {
if sig.is_terminated() {
return Err(SendTimeoutError::Closed(unsafe { data.assume_init() }));
}
{
let mut internal = acquire_internal(&self.internal);
if internal.cancel_send_signal(sig.as_tagged_ptr()) {
return Err(SendTimeoutError::Timeout(unsafe { data.assume_init() }));
}
}
if unlikely(!sig.wait()) {
return Err(SendTimeoutError::Closed(unsafe { data.assume_init() }));
}
}
Ok(())
}
shared_send_impl!();
#[cfg(feature = "async")]
pub fn clone_async(&self) -> AsyncSender<T> {
AsyncSender::<T> {
internal: self.internal.clone_send(),
}
}
#[cfg(feature = "async")]
pub fn to_async(self) -> AsyncSender<T> {
unsafe { transmute(self) }
}
#[cfg(feature = "async")]
pub fn as_async(&self) -> &AsyncSender<T> {
unsafe { transmute(self) }
}
shared_impl!();
}
#[cfg(feature = "async")]
impl<T> AsyncSender<T> {
#[inline(always)]
pub fn send(&'_ self, data: T) -> SendFuture<'_, T> {
SendFuture::new(&self.internal, data)
}
#[inline(always)]
pub fn send_many<'a, 'b>(&'a self, elements: &'b mut VecDeque<T>) -> SendManyFuture<'a, 'b, T> {
SendManyFuture::new(&self.internal, elements)
}
shared_send_impl!();
pub fn clone_sync(&self) -> Sender<T> {
Sender::<T> {
internal: self.internal.clone_send(),
}
}
pub fn to_sync(self) -> Sender<T> {
unsafe { transmute(self) }
}
pub fn as_sync(&self) -> &Sender<T> {
unsafe { transmute(self) }
}
shared_impl!();
}
#[cfg_attr(
feature = "async",
doc = r##"
# Examples
```
let (_s, receiver) = kanal_plus::bounded::<u64>(0);
let async_receiver=receiver.clone_async();
```
"##
)]
#[repr(C)]
pub struct Receiver<T> {
internal: Internal<T>,
}
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Receiver {{ .. }}")
}
}
#[cfg(feature = "async")]
#[repr(C)]
pub struct AsyncReceiver<T> {
internal: Internal<T>,
}
#[cfg(feature = "async")]
impl<T> fmt::Debug for AsyncReceiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "AsyncReceiver {{ .. }}")
}
}
impl<T> Receiver<T> {
#[inline(always)]
pub fn recv(&self) -> Result<T, ReceiveError> {
let cap = self.internal.capacity();
let mut internal = acquire_internal(&self.internal);
if unlikely(internal.recv_count == 0) {
return Err(ReceiveError());
}
if cap > 0 {
if let Some(v) = internal.queue.pop_front() {
if let Some(p) = internal.next_send() {
unsafe { internal.queue.push_back(p.recv()) }
}
return Ok(v);
}
}
if let Some(p) = internal.next_send() {
drop(internal);
return unsafe { Ok(p.recv()) };
}
if unlikely(internal.send_count == 0) {
return Err(ReceiveError());
}
let mut ret = MaybeUninit::<T>::uninit();
let sig = pin!(SyncSignal::new(KanalPtr::new_write_address_ptr(
ret.as_mut_ptr()
)));
internal.push_signal(sig.dynamic_ptr());
drop(internal);
if unlikely(!sig.wait()) {
return Err(ReceiveError());
}
if size_of::<T>() > size_of::<*mut T>() {
Ok(unsafe { ret.assume_init() })
} else {
Ok(unsafe { sig.assume_init() })
}
}
#[inline(always)]
pub fn recv_timeout(&self, duration: Duration) -> Result<T, ReceiveErrorTimeout> {
let cap = self.internal.capacity();
let deadline = Instant::now().checked_add(duration).unwrap();
let mut internal = acquire_internal(&self.internal);
if unlikely(internal.recv_count == 0) {
return Err(ReceiveErrorTimeout::Closed);
}
if cap > 0 {
if let Some(v) = internal.queue.pop_front() {
if let Some(p) = internal.next_send() {
unsafe { internal.queue.push_back(p.recv()) }
}
return Ok(v);
}
}
if let Some(p) = internal.next_send() {
drop(internal);
return unsafe { Ok(p.recv()) };
}
if unlikely(Instant::now() > deadline) {
return Err(ReceiveErrorTimeout::Timeout);
}
if unlikely(internal.send_count == 0) {
return Err(ReceiveErrorTimeout::Closed);
}
let mut ret = MaybeUninit::<T>::uninit();
let sig = pin!(SyncSignal::new(KanalPtr::new_write_address_ptr(
ret.as_mut_ptr()
)));
internal.push_signal(sig.dynamic_ptr());
drop(internal);
if unlikely(!sig.wait_timeout(deadline)) {
if sig.is_terminated() {
return Err(ReceiveErrorTimeout::Closed);
}
{
let mut internal = acquire_internal(&self.internal);
if internal.cancel_recv_signal(sig.as_tagged_ptr()) {
return Err(ReceiveErrorTimeout::Timeout);
}
}
if unlikely(!sig.wait()) {
return Err(ReceiveErrorTimeout::Closed);
}
}
if size_of::<T>() > size_of::<*mut T>() {
Ok(unsafe { ret.assume_init() })
} else {
Ok(unsafe { sig.assume_init() })
}
}
pub fn drain_into_blocking(&self, vec: &mut Vec<T>) -> Result<usize, ReceiveError> {
let vec_initial_length = vec.len();
let mut internal = acquire_internal(&self.internal);
if unlikely(internal.recv_count == 0) {
return Err(ReceiveError());
}
let required_cap = internal.queue.len() + {
if internal.recv_blocking {
0
} else {
internal.wait_list.len()
}
};
let remaining_cap = vec.capacity() - vec_initial_length;
if required_cap > remaining_cap {
vec.reserve(vec_initial_length + required_cap - remaining_cap);
}
vec.extend(internal.queue.drain(..));
while let Some(p) = internal.next_send() {
unsafe { vec.push(p.recv()) }
}
let count = vec.len() - vec_initial_length;
if count > 0 {
return Ok(count);
}
if unlikely(internal.send_count == 0) {
return Err(ReceiveError());
}
let mut ret = MaybeUninit::<T>::uninit();
let sig = pin!(SyncSignal::new(KanalPtr::new_write_address_ptr(
ret.as_mut_ptr()
)));
internal.push_signal(sig.dynamic_ptr());
drop(internal);
if unlikely(!sig.wait()) {
return Err(ReceiveError());
}
if size_of::<T>() > size_of::<*mut T>() {
vec.push(unsafe { ret.assume_init() });
} else {
vec.push(unsafe { sig.assume_init() });
}
Ok(1)
}
shared_recv_impl!();
#[cfg(feature = "async")]
pub fn clone_async(&self) -> AsyncReceiver<T> {
AsyncReceiver::<T> {
internal: self.internal.clone_recv(),
}
}
#[cfg(feature = "async")]
pub fn to_async(self) -> AsyncReceiver<T> {
unsafe { transmute(self) }
}
#[cfg(feature = "async")]
pub fn as_async(&self) -> &AsyncReceiver<T> {
unsafe { transmute(self) }
}
shared_impl!();
}
impl<T> Iterator for Receiver<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.recv().ok()
}
}
#[cfg(feature = "async")]
impl<T> AsyncReceiver<T> {
#[inline(always)]
pub fn recv(&'_ self) -> ReceiveFuture<'_, T> {
ReceiveFuture::new_ref(&self.internal)
}
#[inline(always)]
pub fn stream(&'_ self) -> ReceiveStream<'_, T> {
ReceiveStream::new_borrowed(self)
}
#[inline(always)]
pub fn into_stream(self) -> ReceiveStreamOwned<T> {
ReceiveStreamOwned::new(self)
}
#[inline(always)]
pub fn drain_into_blocking<'a, 'b>(
&'a self,
vec: &'b mut Vec<T>,
) -> DrainIntoBlockingFuture<'a, 'b, T> {
DrainIntoBlockingFuture::new(&self.internal, vec)
}
shared_recv_impl!();
pub fn clone_sync(&self) -> Receiver<T> {
Receiver::<T> {
internal: self.internal.clone_recv(),
}
}
pub fn to_sync(self) -> Receiver<T> {
unsafe { transmute(self) }
}
pub fn as_sync(&self) -> &Receiver<T> {
unsafe { transmute(self) }
}
shared_impl!();
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
self.internal.drop_recv();
}
}
#[cfg(feature = "async")]
impl<T> Drop for AsyncReceiver<T> {
fn drop(&mut self) {
self.internal.drop_recv();
}
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
Self {
internal: self.internal.clone_recv(),
}
}
}
#[cfg(feature = "async")]
impl<T> Clone for AsyncReceiver<T> {
fn clone(&self) -> Self {
Self {
internal: self.internal.clone_recv(),
}
}
}
pub fn bounded<T>(size: usize) -> (Sender<T>, Receiver<T>) {
let internal = Internal::new(true, size);
(
Sender {
internal: internal.clone_unchecked(),
},
Receiver { internal },
)
}
#[cfg(feature = "async")]
pub fn bounded_async<T>(size: usize) -> (AsyncSender<T>, AsyncReceiver<T>) {
let internal = Internal::new(true, size);
(
AsyncSender {
internal: internal.clone_unchecked(),
},
AsyncReceiver { internal },
)
}
const UNBOUNDED_STARTING_SIZE: usize = 32;
pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
let internal = Internal::new(false, UNBOUNDED_STARTING_SIZE);
(
Sender {
internal: internal.clone_unchecked(),
},
Receiver { internal },
)
}
#[cfg(feature = "async")]
pub fn unbounded_async<T>() -> (AsyncSender<T>, AsyncReceiver<T>) {
let internal = Internal::new(false, UNBOUNDED_STARTING_SIZE);
(
AsyncSender {
internal: internal.clone_unchecked(),
},
AsyncReceiver { internal },
)
}