use crate::sync::RwLock;
use maybe_owned::MaybeOwned;
use std::fmt;
use std::sync::atomic::{AtomicBool, Ordering};
#[cfg(feature = "crossbeam-utils")]
use crossbeam_utils::thread;
struct FnCell<T> {
f: Box<dyn Fn(MaybeOwned<'_, T>) -> bool + Send + Sync>,
alive: AtomicBool,
}
impl<T> FnCell<T> {
fn new<F>(f: F) -> Self
where
F: Fn(MaybeOwned<'_, T>) -> bool + Send + Sync + 'static,
{
FnCell {
f: Box::new(f),
alive: AtomicBool::new(true),
}
}
fn call(&self, arg: MaybeOwned<'_, T>) -> bool {
if self.alive.load(Ordering::Relaxed) {
let is_alive = (self.f)(arg);
if !is_alive {
self.alive.store(false, Ordering::Relaxed);
}
is_alive
} else {
false
}
}
fn is_alive(&self) -> bool {
self.alive.load(Ordering::Relaxed)
}
}
impl<T> fmt::Debug for FnCell<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"FnCell {{ f: Fn@{:p}, alive: {:?} }}",
self.f, self.alive
)
}
}
#[derive(Debug)]
pub struct Callbacks<T> {
fs: RwLock<Vec<FnCell<T>>>,
}
impl<T> Callbacks<T> {
pub fn new() -> Self {
Default::default()
}
pub fn push<F>(&self, cb: F)
where
F: Fn(MaybeOwned<'_, T>) -> bool + Send + Sync + 'static,
{
self.fs.write().push(FnCell::new(cb))
}
pub fn call_owned(&self, arg: T) {
let fs = self.fs.read();
let n = fs.len();
let mut i = 0;
let mut all_alive = true;
for _ in 1..n {
all_alive &= fs[i].call(MaybeOwned::Borrowed(&arg));
i += 1;
}
if n > 0 {
all_alive &= fs[i].call(MaybeOwned::Owned(arg));
}
drop(fs);
if !all_alive {
self.cleanup();
}
}
pub fn call_ref(&self, arg: &T) {
let all_alive = self
.fs
.read()
.iter()
.map(|f| f.call(MaybeOwned::Borrowed(arg)))
.fold(true, |a, alive| a & alive);
if !all_alive {
self.cleanup();
}
}
#[inline]
pub fn call<'a>(&self, arg: impl Into<MaybeOwned<'a, T>>)
where
T: 'a,
{
match arg.into() {
MaybeOwned::Owned(v) => self.call_owned(v),
MaybeOwned::Borrowed(r) => self.call_ref(r),
}
}
#[cfg(feature = "crossbeam-utils")]
pub fn call_parallel(&self, arg: &T)
where
T: Sync,
{
let fs = self.fs.read();
let n = fs.len();
if n == 0 {
return;
}
if n == 1 {
if !fs[0].call(MaybeOwned::Borrowed(arg)) {
self.cleanup();
}
return;
}
let all_alive = AtomicBool::new(true);
thread::scope(|scope| {
let all_alive = &all_alive;
let mut i = 0;
for _ in 1..n {
let f = &fs[i];
scope.spawn(move |_| {
if !f.call(MaybeOwned::Borrowed(arg)) {
all_alive.store(false, Ordering::Relaxed);
}
});
i += 1;
}
if !fs[i].call(MaybeOwned::Borrowed(arg)) {
all_alive.store(false, Ordering::Relaxed);
}
})
.unwrap();
drop(fs);
if all_alive.load(Ordering::Relaxed) {
self.cleanup();
}
}
fn cleanup(&self) {
if let Some(mut fs) = self.fs.try_write() {
fs.retain(FnCell::is_alive);
}
}
#[cfg(test)]
pub(crate) fn len(&self) -> usize {
self.fs.read().len()
}
}
impl<T> Default for Callbacks<T> {
#[inline]
fn default() -> Self {
Self {
fs: Default::default(),
}
}
}