#![doc = include_str!("../README.md")]
use std::borrow::Borrow;
use std::fmt;
use std::marker::PhantomData;
use std::ops::Deref;
use std::pin::Pin;
use std::process::abort;
use std::ptr::NonNull;
use std::sync::atomic::Ordering;
#[cfg(loom)]
use loom::sync::atomic::AtomicU64;
#[cfg(not(loom))]
use std::sync::atomic::AtomicU64;
#[cfg(doc)]
use std::marker::Unpin;
pub trait Notify {
fn last_tx_did_drop_pinned(self: Pin<&Self>) {
self.get_ref().last_tx_did_drop()
}
fn last_tx_did_drop(&self) {}
fn last_rx_did_drop_pinned(self: Pin<&Self>) {
self.get_ref().last_rx_did_drop()
}
fn last_rx_did_drop(&self) {}
}
const TX_SHIFT: u8 = 33;
const RX_SHIFT: u8 = 2;
const DC_SHIFT: u8 = 0;
const TX_MASK: u32 = (1 << 31) - 1;
const RX_MASK: u32 = (1 << 31) - 1;
const DC_MASK: u8 = 3;
const TX_INC: u64 = 1 << TX_SHIFT;
const RX_INC: u64 = 1 << RX_SHIFT;
const DC_INC: u64 = 1 << DC_SHIFT;
const RC_INIT: u64 = TX_INC + RX_INC;
fn tx_count(c: u64) -> u32 {
(c >> TX_SHIFT) as u32 & TX_MASK
}
fn rx_count(c: u64) -> u32 {
(c >> RX_SHIFT) as u32 & RX_MASK
}
fn drop_count(c: u64) -> u8 {
(c >> DC_SHIFT) as u8 & DC_MASK
}
const OVERFLOW_PANIC: u32 = 1 << 30;
const OVERFLOW_ABORT: u32 = u32::MAX - (1 << 16);
struct SplitCount(AtomicU64);
impl SplitCount {
fn new() -> Self {
Self(AtomicU64::new(RC_INIT))
}
fn inc_tx(&self) {
let old = self.0.fetch_add(TX_INC, Ordering::Relaxed);
if tx_count(old) < OVERFLOW_PANIC {
return;
}
self.inc_tx_overflow(old)
}
#[cold]
fn inc_tx_overflow(&self, old: u64) {
if tx_count(old) >= OVERFLOW_ABORT {
abort()
} else {
self.0.fetch_sub(TX_INC, Ordering::Relaxed);
panic!("tx count overflow")
}
}
#[inline]
fn dec_tx(&self) -> DecrementAction {
let mut action = DecrementAction::Nothing;
self.0
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |mut current| {
current -= TX_INC;
if tx_count(current) == 0 {
action = if rx_count(current) == 0 {
current += DC_INC;
if drop_count(current) == 1 {
DecrementAction::Nothing
} else {
DecrementAction::Drop
}
} else {
DecrementAction::Notify
}
} else {
}
Some(current)
})
.unwrap();
action
}
fn inc_rx(&self) {
let old = self.0.fetch_add(RX_INC, Ordering::Relaxed);
if rx_count(old) < OVERFLOW_PANIC {
return;
}
self.inc_rx_overflow(old)
}
#[cold]
fn inc_rx_overflow(&self, old: u64) {
if rx_count(old) >= OVERFLOW_ABORT {
abort()
} else {
self.0.fetch_sub(RX_INC, Ordering::Relaxed);
panic!("rx count overflow")
}
}
#[inline]
fn dec_rx(&self) -> DecrementAction {
let mut action = DecrementAction::Nothing;
self.0
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |mut current| {
current -= RX_INC;
if rx_count(current) == 0 {
action = if tx_count(current) == 0 {
current += DC_INC;
if drop_count(current) == 1 {
DecrementAction::Nothing
} else {
DecrementAction::Drop
}
} else {
DecrementAction::Notify
}
} else {
}
Some(current)
})
.unwrap();
action
}
fn inc_drop_count(&self) -> bool {
1 == self.0.fetch_add(DC_INC, Ordering::AcqRel)
}
}
enum DecrementAction {
Nothing,
Notify,
Drop,
}
struct Inner<T> {
data: T,
count: SplitCount,
}
fn deallocate<T>(ptr: NonNull<Inner<T>>) {
drop(unsafe { Box::from_raw(ptr.as_ptr()) });
}
pub struct Tx<T: Notify> {
ptr: NonNull<Inner<T>>,
phantom: PhantomData<T>,
}
unsafe impl<T: Sync + Send + Notify> Send for Tx<T> {}
unsafe impl<T: Sync + Send + Notify> Sync for Tx<T> {}
impl<T: Notify> Drop for Tx<T> {
fn drop(&mut self) {
let inner = unsafe { self.ptr.as_ref() };
match inner.count.dec_tx() {
DecrementAction::Nothing => (),
DecrementAction::Notify => {
unsafe { Pin::new_unchecked(&inner.data) }.last_tx_did_drop_pinned();
if inner.count.inc_drop_count() {
deallocate(self.ptr);
}
}
DecrementAction::Drop => {
deallocate(self.ptr);
}
}
}
}
impl<T: Notify> Clone for Tx<T> {
fn clone(&self) -> Self {
let inner = unsafe { self.ptr.as_ref() };
inner.count.inc_tx();
Tx { ..*self }
}
}
impl<T: Notify> Deref for Tx<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&unsafe { self.ptr.as_ref() }.data
}
}
impl<T: Notify> AsRef<T> for Tx<T> {
fn as_ref(&self) -> &T {
self.deref()
}
}
impl<T: Notify> Borrow<T> for Tx<T> {
fn borrow(&self) -> &T {
self.deref()
}
}
impl<T: Notify + fmt::Debug> fmt::Debug for Tx<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(self.as_ref(), f)
}
}
impl<T: Notify + fmt::Display> fmt::Display for Tx<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(self.as_ref(), f)
}
}
pub struct Rx<T: Notify> {
ptr: NonNull<Inner<T>>,
phantom: PhantomData<T>,
}
unsafe impl<T: Sync + Send + Notify> Send for Rx<T> {}
unsafe impl<T: Sync + Send + Notify> Sync for Rx<T> {}
impl<T: Notify> Drop for Rx<T> {
fn drop(&mut self) {
let inner = unsafe { self.ptr.as_ref() };
match inner.count.dec_rx() {
DecrementAction::Nothing => (),
DecrementAction::Notify => {
unsafe { Pin::new_unchecked(&inner.data) }.last_rx_did_drop_pinned();
if inner.count.inc_drop_count() {
deallocate(self.ptr);
}
}
DecrementAction::Drop => {
deallocate(self.ptr);
}
}
}
}
impl<T: Notify> Clone for Rx<T> {
fn clone(&self) -> Self {
let inner = unsafe { self.ptr.as_ref() };
inner.count.inc_rx();
Rx { ..*self }
}
}
impl<T: Notify> Deref for Rx<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&unsafe { self.ptr.as_ref() }.data
}
}
impl<T: Notify> AsRef<T> for Rx<T> {
fn as_ref(&self) -> &T {
self.deref()
}
}
impl<T: Notify> Borrow<T> for Rx<T> {
fn borrow(&self) -> &T {
self.deref()
}
}
impl<T: Notify + fmt::Debug> fmt::Debug for Rx<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(self.as_ref(), f)
}
}
impl<T: Notify + fmt::Display> fmt::Display for Rx<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(self.as_ref(), f)
}
}
pub fn new<T: Notify>(data: T) -> (Tx<T>, Rx<T>) {
let x = Box::new(Inner {
count: SplitCount::new(),
data,
});
let ptr = unsafe { NonNull::new_unchecked(Box::into_raw(x)) };
(
Tx {
ptr,
phantom: PhantomData,
},
Rx {
ptr,
phantom: PhantomData,
},
)
}
pub fn pin<T: Notify>(data: T) -> (Pin<Tx<T>>, Pin<Rx<T>>) {
let (tx, rx) = new(data);
unsafe { (Pin::new_unchecked(tx), Pin::new_unchecked(rx)) }
}