#![doc = include_str!("../README.md")]
use std::borrow::Borrow;
use std::fmt;
use std::marker::PhantomData;
use std::ops::Deref;
use std::process::abort;
use std::ptr::NonNull;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
pub trait Notify {
fn last_tx_did_drop(&self) {}
fn last_rx_did_drop(&self) {}
}
const TX_INC: u64 = 1 << 32;
const RX_INC: u64 = 1;
const RC_INIT: u64 = TX_INC + RX_INC;
const RUNAWAY_RAMP: u32 = u32::MAX - (1 << 16);
const RUNAWAY_MAX: u32 = u32::MAX;
fn tx_count(c: u64) -> u32 {
(c >> 32) as _
}
fn rx_count(c: u64) -> u32 {
c as _
}
struct Inner<T> {
count: AtomicU64,
data: T,
}
fn deallocate<T>(ptr: &NonNull<Inner<T>>) {
drop(unsafe { Box::from_raw(ptr.as_ptr()) });
}
pub struct Tx<T: Notify> {
ptr: NonNull<Inner<T>>,
phantom: PhantomData<T>,
}
unsafe impl<T: Sync + Send + Notify> Send for Tx<T> {}
unsafe impl<T: Sync + Send + Notify> Sync for Tx<T> {}
impl<T: Notify> Drop for Tx<T> {
fn drop(&mut self) {
let inner = unsafe { self.ptr.as_ref() };
let old = inner.count.fetch_sub(TX_INC, Ordering::AcqRel);
if tx_count(old) != 1 {
return;
}
if rx_count(old) == 0 {
deallocate(&self.ptr)
} else {
inner.data.last_tx_did_drop()
}
}
}
impl<T: Notify> Clone for Tx<T> {
fn clone(&self) -> Self {
let inner = unsafe { self.ptr.as_ref() };
let old = inner.count.fetch_add(TX_INC, Ordering::Relaxed);
if tx_count(old) < RUNAWAY_RAMP {
return Tx { ..*self };
}
if tx_count(old) >= RUNAWAY_MAX {
abort()
} else {
inner.count.fetch_sub(TX_INC, Ordering::Relaxed);
panic!("tx count overflow")
}
}
}
impl<T: Notify> Deref for Tx<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&unsafe { self.ptr.as_ref() }.data
}
}
impl<T: Notify> AsRef<T> for Tx<T> {
fn as_ref(&self) -> &T {
self.deref()
}
}
impl<T: Notify> Borrow<T> for Tx<T> {
fn borrow(&self) -> &T {
self.deref()
}
}
impl<T: Notify + fmt::Debug> fmt::Debug for Tx<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(self.as_ref(), f)
}
}
impl<T: Notify + fmt::Display> fmt::Display for Tx<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(self.as_ref(), f)
}
}
pub struct Rx<T: Notify> {
ptr: NonNull<Inner<T>>,
phantom: PhantomData<T>,
}
unsafe impl<T: Sync + Send + Notify> Send for Rx<T> {}
unsafe impl<T: Sync + Send + Notify> Sync for Rx<T> {}
impl<T: Notify> Drop for Rx<T> {
fn drop(&mut self) {
let inner = unsafe { self.ptr.as_ref() };
let old = inner.count.fetch_sub(RX_INC, Ordering::AcqRel);
if rx_count(old) != 1 {
return;
}
if tx_count(old) == 0 {
deallocate(&self.ptr)
} else {
inner.data.last_rx_did_drop()
}
}
}
impl<T: Notify> Clone for Rx<T> {
fn clone(&self) -> Self {
let inner = unsafe { self.ptr.as_ref() };
let old = inner.count.fetch_add(RX_INC, Ordering::Relaxed);
if rx_count(old) < RUNAWAY_RAMP {
return Rx { ..*self };
}
if rx_count(old) >= RUNAWAY_MAX {
abort()
} else {
inner.count.fetch_sub(RX_INC, Ordering::Relaxed);
panic!("rx count overflow")
}
}
}
impl<T: Notify> Deref for Rx<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&unsafe { self.ptr.as_ref() }.data
}
}
impl<T: Notify> AsRef<T> for Rx<T> {
fn as_ref(&self) -> &T {
self.deref()
}
}
impl<T: Notify> Borrow<T> for Rx<T> {
fn borrow(&self) -> &T {
self.deref()
}
}
impl<T: Notify + fmt::Debug> fmt::Debug for Rx<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(self.as_ref(), f)
}
}
impl<T: Notify + fmt::Display> fmt::Display for Rx<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(self.as_ref(), f)
}
}
pub fn new<T: Notify>(data: T) -> (Tx<T>, Rx<T>) {
let x = Box::new(Inner {
count: AtomicU64::new(RC_INIT),
data,
});
let r = Box::leak(x);
(
Tx {
ptr: r.into(),
phantom: PhantomData,
},
Rx {
ptr: r.into(),
phantom: PhantomData,
},
)
}