#![doc = include_str!("../README.md")]
use std::borrow::Borrow;
use std::fmt;
use std::marker::PhantomData;
use std::ops::Deref;
use std::process::abort;
use std::ptr::NonNull;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
pub trait Notify {
fn last_tx_did_drop(&self) {}
fn last_rx_did_drop(&self) {}
}
const TX_INC: u64 = 1 << 32;
const RX_INC: u64 = 1;
const RC_INIT: u64 = TX_INC + RX_INC;
const OVERFLOW_PANIC: u32 = 1 << 31;
const OVERFLOW_ABORT: u32 = u32::MAX - (1 << 16);
struct SplitCount {
count: AtomicU64,
}
impl SplitCount {
fn new() -> Self {
Self {
count: AtomicU64::new(RC_INIT),
}
}
fn inc_tx(&self) {
let old = self.count.fetch_add(TX_INC, Ordering::Relaxed);
if tx_count(old) < OVERFLOW_PANIC {
return;
}
self.inc_tx_overflow(old)
}
#[cold]
fn inc_tx_overflow(&self, old: u64) {
if tx_count(old) >= OVERFLOW_ABORT {
abort()
} else {
self.count.fetch_sub(TX_INC, Ordering::Relaxed);
panic!("tx count overflow")
}
}
#[inline]
fn dec_tx(&self) -> DecrementAction {
let old = self.count.fetch_sub(TX_INC, Ordering::AcqRel);
if tx_count(old) != 1 {
DecrementAction::Nothing
} else if rx_count(old) != 0 {
DecrementAction::Notify
} else {
DecrementAction::Drop
}
}
fn inc_rx(&self) {
let old = self.count.fetch_add(RX_INC, Ordering::Relaxed);
if rx_count(old) < OVERFLOW_PANIC {
return;
}
self.inc_rx_overflow(old)
}
#[cold]
fn inc_rx_overflow(&self, old: u64) {
if rx_count(old) >= OVERFLOW_ABORT {
abort()
} else {
self.count.fetch_sub(RX_INC, Ordering::Relaxed);
panic!("rx count overflow")
}
}
#[inline]
fn dec_rx(&self) -> DecrementAction {
let old = self.count.fetch_sub(RX_INC, Ordering::AcqRel);
if rx_count(old) != 1 {
DecrementAction::Nothing
} else if tx_count(old) != 0 {
DecrementAction::Notify
} else {
DecrementAction::Drop
}
}
}
enum DecrementAction {
Nothing,
Notify,
Drop,
}
fn tx_count(c: u64) -> u32 {
(c >> 32) as _
}
fn rx_count(c: u64) -> u32 {
c as _
}
struct Inner<T> {
count: SplitCount,
data: T,
}
fn deallocate<T>(ptr: &NonNull<Inner<T>>) {
drop(unsafe { Box::from_raw(ptr.as_ptr()) });
}
pub struct Tx<T: Notify> {
ptr: NonNull<Inner<T>>,
phantom: PhantomData<T>,
}
unsafe impl<T: Sync + Send + Notify> Send for Tx<T> {}
unsafe impl<T: Sync + Send + Notify> Sync for Tx<T> {}
impl<T: Notify> Drop for Tx<T> {
fn drop(&mut self) {
let inner = unsafe { self.ptr.as_ref() };
match inner.count.dec_tx() {
DecrementAction::Nothing => (),
DecrementAction::Notify => inner.data.last_tx_did_drop(),
DecrementAction::Drop => deallocate(&self.ptr),
}
}
}
impl<T: Notify> Clone for Tx<T> {
fn clone(&self) -> Self {
let inner = unsafe { self.ptr.as_ref() };
inner.count.inc_tx();
Tx { ..*self }
}
}
impl<T: Notify> Deref for Tx<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&unsafe { self.ptr.as_ref() }.data
}
}
impl<T: Notify> AsRef<T> for Tx<T> {
fn as_ref(&self) -> &T {
self.deref()
}
}
impl<T: Notify> Borrow<T> for Tx<T> {
fn borrow(&self) -> &T {
self.deref()
}
}
impl<T: Notify + fmt::Debug> fmt::Debug for Tx<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(self.as_ref(), f)
}
}
impl<T: Notify + fmt::Display> fmt::Display for Tx<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(self.as_ref(), f)
}
}
pub struct Rx<T: Notify> {
ptr: NonNull<Inner<T>>,
phantom: PhantomData<T>,
}
unsafe impl<T: Sync + Send + Notify> Send for Rx<T> {}
unsafe impl<T: Sync + Send + Notify> Sync for Rx<T> {}
impl<T: Notify> Drop for Rx<T> {
fn drop(&mut self) {
let inner = unsafe { self.ptr.as_ref() };
match inner.count.dec_rx() {
DecrementAction::Nothing => (),
DecrementAction::Notify => inner.data.last_rx_did_drop(),
DecrementAction::Drop => deallocate(&self.ptr),
}
}
}
impl<T: Notify> Clone for Rx<T> {
fn clone(&self) -> Self {
let inner = unsafe { self.ptr.as_ref() };
inner.count.inc_rx();
Rx { ..*self }
}
}
impl<T: Notify> Deref for Rx<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&unsafe { self.ptr.as_ref() }.data
}
}
impl<T: Notify> AsRef<T> for Rx<T> {
fn as_ref(&self) -> &T {
self.deref()
}
}
impl<T: Notify> Borrow<T> for Rx<T> {
fn borrow(&self) -> &T {
self.deref()
}
}
impl<T: Notify + fmt::Debug> fmt::Debug for Rx<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(self.as_ref(), f)
}
}
impl<T: Notify + fmt::Display> fmt::Display for Rx<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(self.as_ref(), f)
}
}
pub fn new<T: Notify>(data: T) -> (Tx<T>, Rx<T>) {
let x = Box::new(Inner {
count: SplitCount::new(),
data,
});
let r = Box::leak(x);
(
Tx {
ptr: r.into(),
phantom: PhantomData,
},
Rx {
ptr: r.into(),
phantom: PhantomData,
},
)
}