#![allow(unknown_lints)]
use super::parker::Parker;
use std::{
cell::UnsafeCell,
mem::MaybeUninit,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Once,
},
};
struct Inner<T> {
sender_count: AtomicUsize,
recver: AtomicBool,
lock: Once,
value: UnsafeCell<MaybeUninit<T>>,
}
#[derive(Debug)]
pub struct Receiver<T>(std::ptr::NonNull<Inner<T>>);
#[derive(Debug)]
pub struct Sender<T>(std::ptr::NonNull<Inner<T>>);
unsafe impl<T: Send> Send for Receiver<T> {}
unsafe impl<T: Send> Send for Sender<T> {}
unsafe impl<T> Sync for Receiver<T> {}
unsafe impl<T> Sync for Sender<T> {}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
unsafe { self.0.as_ref().sender_count.fetch_add(1, Ordering::Relaxed) };
Self(self.0)
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
let inner_ref = unsafe { self.0.as_ref() };
inner_ref.recver.store(false, Ordering::Release);
let mut has_value = true;
inner_ref.lock.call_once(|| has_value = false);
if has_value {
unsafe { inner_ref.value.get().as_mut().unwrap_unchecked().assume_init_drop() };
}
if inner_ref.sender_count.load(Ordering::Acquire) == 0 {
#[allow(dropping_references)]
std::mem::drop(inner_ref);
std::mem::drop(unsafe { Box::from_raw(self.0.as_ptr()) });
}
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let inner_ref = unsafe { self.0.as_ref() };
if inner_ref.sender_count.fetch_sub(1, Ordering::Release) == 1 {
#[allow(clippy::collapsible_if)]
if !inner_ref.recver.load(Ordering::Acquire) {
#[allow(dropping_references)]
std::mem::drop(inner_ref);
std::mem::drop(unsafe { Box::from_raw(self.0.as_ptr()) });
}
}
}
}
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Box::new(Inner {
sender_count: AtomicUsize::new(1),
recver: AtomicBool::new(true),
lock: Once::new(),
value: UnsafeCell::new(MaybeUninit::uninit()),
});
let inner = std::ptr::NonNull::new(Box::into_raw(inner)).unwrap();
(Sender(inner), Receiver(inner))
}
impl<T> Receiver<T> {
pub fn is_empty(&self) -> bool {
let inner_ref = unsafe { self.0.as_ref() };
!inner_ref.lock.is_completed()
}
pub fn peek(&self) -> Option<&T>
where
T: Sync,
{
let inner_ref = unsafe { self.0.as_ref() };
inner_ref
.lock
.is_completed()
.then(|| unsafe { inner_ref.value.get().as_ref().unwrap_unchecked().assume_init_ref() })
}
pub fn recv_now(self) -> Option<T> {
let ptr = self.0;
let inner_ref = unsafe { ptr.as_ref() };
std::mem::forget(self);
inner_ref.recver.store(false, Ordering::Release);
let mut has_value = true;
inner_ref.lock.call_once(|| has_value = false);
let retval = has_value.then(|| unsafe {
inner_ref.value.get().as_ref().unwrap_unchecked().assume_init_read()
});
if inner_ref.sender_count.load(Ordering::Acquire) == 0 {
#[allow(dropping_references)]
std::mem::drop(inner_ref);
std::mem::drop(unsafe { Box::from_raw(ptr.as_ptr()) });
}
retval
}
pub fn recv(self, parker: &Parker) -> Option<T> {
if self.is_empty() {
parker.park();
}
self.recv_now()
}
}
impl<T> Sender<T> {
pub fn is_closed(&self) -> bool {
let inner_ref = unsafe { self.0.as_ref() };
!inner_ref.recver.load(Ordering::Relaxed) || inner_ref.lock.is_completed()
}
pub fn send(self, value: T) -> Result<(), T> {
let mut value = Some(value);
let inner_ref = unsafe { self.0.as_ref() };
if inner_ref.recver.load(Ordering::Relaxed) {
inner_ref.lock.call_once(|| unsafe {
let value = value.take().unwrap_unchecked();
inner_ref.value.get().as_mut().unwrap_unchecked().write(value);
});
}
match value {
Some(value) => Err(value),
None => Ok(()),
}
}
}
impl<T> super::Sender for Option<Sender<T>> {
type Value = T;
fn send(&mut self, value: Self::Value) -> std::ops::ControlFlow<super::Sent> {
let Some(sender) = self.take() else {
return std::ops::ControlFlow::Break(super::Sent::Closed);
};
if sender.send(value).is_ok() {
std::ops::ControlFlow::Break(super::Sent::Ok)
} else {
std::ops::ControlFlow::Break(super::Sent::Closed)
}
}
fn may_send(&self) -> bool {
self.as_ref().is_some_and(|snd| !snd.is_closed())
}
}