use event_listener::{Event, EventListener, IntoNotification};
use std::sync::{
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Arc, Mutex, Weak,
};
const LOCAL_ONE: u64 = 1;
const KIDS_ONE: u64 = 1 << 32;
const LOCAL_MASK: u64 = 0xFFFF_FFFF;
#[derive(Debug, Default)]
pub(crate) struct Inner {
stop_event: Event,
zero_event: Event,
packed: AtomicU64,
stopped: AtomicBool,
swansong_count: AtomicUsize,
ancestors: Vec<Arc<Inner>>,
children: Mutex<Vec<Weak<Inner>>>,
}
impl Inner {
pub(crate) fn new_root() -> Arc<Self> {
Arc::new(Self {
swansong_count: AtomicUsize::new(1),
..Self::default()
})
}
pub(crate) fn new_child(parent: &Arc<Self>) -> Arc<Self> {
let mut ancestors = Vec::with_capacity(parent.ancestors.len() + 1);
ancestors.push(Arc::clone(parent));
ancestors.extend(parent.ancestors.iter().cloned());
let inner = Arc::new(Self {
swansong_count: AtomicUsize::new(1),
ancestors,
..Self::default()
});
parent.add_child(Arc::downgrade(&inner));
if parent.is_stopped() {
inner.stop();
}
inner
}
fn add_child(&self, child: Weak<Inner>) {
let mut children = self.children.lock().unwrap();
children.retain(|c| c.upgrade().is_some_and(|c| !c.is_complete()));
children.push(child);
}
pub(crate) fn is_stopped(&self) -> bool {
self.stopped.load(Ordering::SeqCst)
}
pub(crate) fn is_stopped_relaxed(&self) -> bool {
self.stopped.load(Ordering::Relaxed)
}
pub(crate) fn is_zero(&self) -> bool {
self.packed.load(Ordering::SeqCst) == 0
}
pub(crate) fn is_zero_relaxed(&self) -> bool {
self.packed.load(Ordering::Relaxed) == 0
}
pub(crate) fn is_complete(&self) -> bool {
self.is_stopped() && self.is_zero()
}
pub(crate) fn is_root(&self) -> bool {
self.ancestors.is_empty()
}
pub(crate) fn listen_zero(&self) -> EventListener {
self.zero_event.listen()
}
pub(crate) fn listen_stop(&self) -> EventListener {
self.stop_event.listen()
}
pub(crate) fn swansong_clone(&self) {
self.swansong_count.fetch_add(1, Ordering::SeqCst);
}
pub(crate) fn swansong_drop(&self) -> bool {
self.swansong_count.fetch_sub(1, Ordering::SeqCst) == 1
}
pub(crate) fn stop(&self) {
log::trace!("intending to stop");
if self.stopped.swap(true, Ordering::SeqCst) {
log::trace!("was already stopped");
return;
}
log::trace!("stopped");
self.stop_event.notify(usize::MAX.relaxed());
let children: Vec<Arc<Inner>> = {
let guard = self.children.lock().unwrap();
guard.iter().filter_map(Weak::upgrade).collect()
};
for child in children {
child.stop();
}
if self.is_zero() {
self.zero_event.notify(usize::MAX.relaxed());
}
}
pub(crate) fn increment_guard(&self) {
let prev = self.packed.fetch_add(LOCAL_ONE, Ordering::SeqCst);
log::trace!("incremented local: {prev:#x} -> {:#x}", prev + LOCAL_ONE);
if prev == 0 {
for ancestor in &self.ancestors {
let prev = ancestor.packed.fetch_add(KIDS_ONE, Ordering::SeqCst);
if prev != 0 {
break;
}
}
}
}
pub(crate) fn decrement_guard(&self) {
let prev = self.packed.fetch_sub(LOCAL_ONE, Ordering::SeqCst);
let new = prev - LOCAL_ONE;
log::trace!("decremented local: {prev:#x} -> {new:#x}");
if new == 0 {
if self.is_stopped() {
self.zero_event.notify(usize::MAX.relaxed());
}
for ancestor in &self.ancestors {
let prev = ancestor.packed.fetch_sub(KIDS_ONE, Ordering::SeqCst);
let new = prev - KIDS_ONE;
if new == 0 {
if ancestor.is_stopped() {
ancestor.zero_event.notify(usize::MAX.relaxed());
}
} else {
break;
}
}
}
}
pub(crate) fn guard_count_subtree(&self) -> usize {
let local = usize::try_from(self.packed.load(Ordering::Relaxed) & LOCAL_MASK)
.expect("local_guards fits in usize");
let children_sum: usize = self
.children
.lock()
.unwrap()
.iter()
.filter_map(Weak::upgrade)
.map(|c| c.guard_count_subtree())
.sum();
local + children_sum
}
}