use std::mem::ManuallyDrop;
use std::sync::{
atomic::{AtomicPtr, Ordering},
Arc, Weak,
};
use std::thread::Thread;
static mut SKIP_PARKING: std::mem::MaybeUninit<Thread> = std::mem::MaybeUninit::uninit();
#[derive(Clone, Debug, Default)]
pub struct Unparker<S>(S, ManuallyDrop<Arc<AtomicPtr<Thread>>>);
#[derive(Debug)]
pub struct Parker(Weak<AtomicPtr<Thread>>);
pub fn new<S>(sender: S) -> (Unparker<S>, Parker) {
let arc = Arc::new(AtomicPtr::new(std::ptr::null_mut()));
let weak = Arc::downgrade(&arc);
(Unparker(sender, ManuallyDrop::new(arc)), Parker(weak))
}
impl<S> Drop for Unparker<S> {
fn drop(&mut self) {
let Some(arc) = Arc::into_inner(unsafe { ManuallyDrop::take(&mut self.1) }) else {
return;
};
let ptr = arc.into_inner();
if ptr != unsafe { SKIP_PARKING.as_mut_ptr() } {
if let Some(th) = unsafe { ptr.as_ref() } {
th.clone().unpark();
}
}
}
}
impl<S> Unparker<S> {
pub fn unpark(&self) {
let ptr = self.1.swap(unsafe { SKIP_PARKING.as_mut_ptr() }, Ordering::AcqRel);
if ptr != unsafe { SKIP_PARKING.as_mut_ptr() } {
if let Some(th) = unsafe { ptr.as_ref() } {
th.clone().unpark();
}
}
}
}
impl<S: super::Sender> super::Sender for Unparker<S> {
type Value = <S as super::Sender>::Value;
fn send(&mut self, value: Self::Value) -> std::ops::ControlFlow<super::Sent> {
let result = self.0.send(value);
if result != std::ops::ControlFlow::Break(super::Sent::Closed) {
self.unpark();
}
result
}
fn may_send(&self) -> bool {
self.0.may_send()
}
}
impl Parker {
pub fn park(&self) {
let Some(strong) = self.0.upgrade() else {
return;
};
let mut th = std::thread::current();
if strong
.compare_exchange(
std::ptr::null_mut(),
&mut th as *mut Thread,
Ordering::AcqRel,
Ordering::Acquire,
)
.is_ok()
{
if Arc::into_inner(strong).is_none() {
std::thread::park();
} else {
return;
}
}
let Some(strong) = self.0.upgrade() else {
return;
};
strong.store(std::ptr::null_mut(), Ordering::Release);
}
}