use std::ops::Deref;
use std::sync::{atomic, Arc};
use private::RefCounter as _;
use crate::chan::Chan;
pub struct Ptr<A, Rc>
where
Rc: RefCounter,
{
inner: Arc<Chan<A>>,
ref_counter: Rc,
}
impl<A> Ptr<A, TxStrong> {
pub fn new(inner: Arc<Chan<A>>) -> Self {
let policy = TxStrong(()).make_new(inner.as_ref());
Self {
ref_counter: policy,
inner,
}
}
}
impl<A> Ptr<A, Rx> {
pub fn new(inner: Arc<Chan<A>>) -> Self {
let ref_counter = Rx(()).make_new(inner.as_ref());
Self { ref_counter, inner }
}
}
impl<A, Rc> Ptr<A, Rc>
where
Rc: RefCounter,
{
pub fn is_strong(&self) -> bool {
self.ref_counter.is_strong()
}
pub fn to_tx_weak(&self) -> Ptr<A, TxWeak> {
Ptr {
inner: self.inner.clone(),
ref_counter: TxWeak(()),
}
}
pub fn try_to_tx_strong(&self) -> Option<Ptr<A, TxStrong>> {
Some(Ptr {
inner: self.inner.clone(),
ref_counter: TxStrong::try_new(self.inner.as_ref())?,
})
}
pub fn inner_ptr(&self) -> *const () {
Arc::as_ptr(&self.inner) as *const ()
}
pub fn sender_count(&self) -> usize {
self.inner.sender_count.load(atomic::Ordering::SeqCst)
}
pub fn receiver_count(&self) -> usize {
self.inner.receiver_count.load(atomic::Ordering::SeqCst)
}
}
impl<A, Rc> Ptr<A, Rc>
where
Rc: RefCounter + Into<TxEither>,
{
pub fn to_tx_either(&self) -> Ptr<A, TxEither> {
Ptr {
inner: self.inner.clone(),
ref_counter: self.ref_counter.make_new(&self.inner).into(),
}
}
}
impl<A, Rc> Clone for Ptr<A, Rc>
where
Rc: RefCounter,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
ref_counter: self.ref_counter.make_new(self.inner.as_ref()),
}
}
}
impl<A, Rc> Drop for Ptr<A, Rc>
where
Rc: RefCounter,
{
fn drop(&mut self) {
self.ref_counter.destroy(self.inner.as_ref())
}
}
impl<A, Rc> Deref for Ptr<A, Rc>
where
Rc: RefCounter,
{
type Target = Chan<A>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
#[derive(Debug)]
pub struct TxStrong(());
impl TxStrong {
fn try_new<A>(inner: &Chan<A>) -> Option<TxStrong> {
use std::sync::atomic::Ordering::*;
let mut n = inner.sender_count.load(Relaxed);
loop {
if n == 0 {
return None;
}
match inner
.sender_count
.compare_exchange_weak(n, n + 1, Acquire, Relaxed)
{
Ok(_) => return Some(TxStrong(())), Err(old) => n = old,
}
}
}
}
#[derive(Debug)]
pub struct TxWeak(());
#[derive(Debug)]
pub enum TxEither {
Strong(TxStrong),
Weak(TxWeak),
}
pub struct Rx(());
pub trait RefCounter: Send + Sync + 'static + Unpin + private::RefCounter {}
impl RefCounter for TxStrong {}
impl RefCounter for TxWeak {}
impl RefCounter for TxEither {}
impl RefCounter for Rx {}
impl From<TxStrong> for TxEither {
fn from(strong: TxStrong) -> Self {
TxEither::Strong(strong)
}
}
impl From<TxWeak> for TxEither {
fn from(weak: TxWeak) -> Self {
TxEither::Weak(weak)
}
}
mod private {
use super::*;
pub trait RefCounter {
fn make_new<A>(&self, chan: &Chan<A>) -> Self;
fn destroy<A>(&self, chan: &Chan<A>);
fn is_strong(&self) -> bool;
}
impl RefCounter for TxStrong {
fn make_new<A>(&self, inner: &Chan<A>) -> Self {
inner.on_sender_created();
TxStrong(())
}
fn destroy<A>(&self, inner: &Chan<A>) {
inner.on_sender_dropped();
}
fn is_strong(&self) -> bool {
true
}
}
impl RefCounter for TxWeak {
fn make_new<A>(&self, _: &Chan<A>) -> Self {
TxWeak(())
}
fn destroy<A>(&self, _: &Chan<A>) {}
fn is_strong(&self) -> bool {
false
}
}
impl RefCounter for TxEither {
fn make_new<A>(&self, chan: &Chan<A>) -> Self {
match self {
TxEither::Strong(strong) => TxEither::Strong(strong.make_new(chan)),
TxEither::Weak(weak) => TxEither::Weak(weak.make_new(chan)),
}
}
fn destroy<A>(&self, chan: &Chan<A>) {
match self {
TxEither::Strong(strong) => strong.destroy(chan),
TxEither::Weak(weak) => weak.destroy(chan),
}
}
fn is_strong(&self) -> bool {
match self {
TxEither::Strong(_) => true,
TxEither::Weak(_) => false,
}
}
}
impl RefCounter for Rx {
fn make_new<A>(&self, inner: &Chan<A>) -> Self {
inner.on_receiver_created();
Rx(())
}
fn destroy<A>(&self, inner: &Chan<A>) {
inner.on_receiver_dropped();
}
fn is_strong(&self) -> bool {
true
}
}
}
#[cfg(test)]
mod tests {
use std::mem::size_of;
use super::*;
#[test]
fn size_of_ptr() {
assert_eq!(size_of::<Ptr<Foo, TxStrong>>(), 8);
assert_eq!(size_of::<Ptr<Foo, TxWeak>>(), 8);
assert_eq!(size_of::<Ptr<Foo, Rx>>(), 8);
assert_eq!(size_of::<Ptr<Foo, TxEither>>(), 16);
}
#[test]
fn starts_with_rc_count_one() {
let inner = Arc::new(Chan::new(None));
let _ptr1 = Ptr::<Foo, TxStrong>::new(inner.clone());
assert_eq!(inner.sender_count.load(atomic::Ordering::SeqCst), 1)
}
#[test]
fn clone_increments_count() {
let inner = Arc::new(Chan::new(None));
let ptr1 = Ptr::<Foo, TxStrong>::new(inner.clone());
#[allow(clippy::redundant_clone)]
let _ptr2 = ptr1.clone();
assert_eq!(inner.sender_count.load(atomic::Ordering::SeqCst), 2)
}
#[test]
fn dropping_last_reference_calls_on_last_drop() {
let inner = Arc::new(Chan::new(None));
let ptr1 = Ptr::<Foo, TxStrong>::new(inner.clone());
std::mem::drop(ptr1);
assert_eq!(inner.sender_count.load(atomic::Ordering::SeqCst), 0);
}
#[test]
fn can_convert_tx_strong_into_weak() {
let inner = Arc::new(Chan::new(None));
let strong_ptr = Ptr::<Foo, TxStrong>::new(inner.clone());
let _weak_ptr = strong_ptr.to_tx_weak();
assert_eq!(inner.sender_count.load(atomic::Ordering::SeqCst), 1);
}
#[test]
fn can_clone_either() {
let inner = Arc::new(Chan::new(None));
let strong_ptr = Ptr::<Foo, TxStrong>::new(inner.clone());
let either_ptr_1 = strong_ptr.to_tx_either();
#[allow(clippy::redundant_clone)]
let _either_ptr_2 = either_ptr_1.clone();
assert_eq!(inner.sender_count.load(atomic::Ordering::SeqCst), 3);
}
#[test]
fn either_is_strong() {
let inner = Arc::new(Chan::new(None));
let strong_ptr = Ptr::<Foo, TxStrong>::new(inner);
let either_ptr = strong_ptr.to_tx_either();
assert!(either_ptr.is_strong());
}
struct Foo;
impl crate::Actor for Foo {
type Stop = ();
async fn stopped(self) -> Self::Stop {}
}
}