#![deny(dead_code, unused_imports)]
use std::sync::{atomic::AtomicUsize, Arc, Mutex};
pub trait DropkickSync {
fn dropkick(self);
}
#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[repr(transparent)]
pub struct Dropkick<T: DropkickSync> {
inner: Option<T>,
}
impl<T> Dropkick<T>
where
T: DropkickSync,
{
pub fn new(target: T) -> Self {
Self {
inner: Some(target),
}
}
pub fn callback<R>(callback_fn: T) -> Self
where
T: FnOnce() -> R,
{
Self::new(callback_fn)
}
pub fn counter(mut self) {
self.inner.take();
drop(self);
}
pub fn counter_take(mut self) -> T {
let value = self
.inner
.take()
.expect("Dropkick dropped before countered?");
drop(self);
value
}
}
impl<T> Dropkick<Arc<Mutex<Option<T>>>>
where
T: DropkickSync,
Arc<Mutex<Option<T>>>: DropkickSync,
{
pub fn counter_take_mutex(&self) -> Option<T> {
self
.inner
.as_ref()
.and_then(|inner| match inner.lock().ok() {
Some(mut lock) => {
let value = lock.take();
drop(lock);
value
}
None => None,
})
}
}
impl<T> ::std::ops::Deref for Dropkick<T>
where
T: DropkickSync,
{
type Target = T;
fn deref(&self) -> &Self::Target {
self
.inner
.as_ref()
.expect("Dropkick dropped before countered?")
}
}
impl<T> ::std::ops::DerefMut for Dropkick<T>
where
T: DropkickSync,
{
fn deref_mut(&mut self) -> &mut Self::Target {
self
.inner
.as_mut()
.expect("Dropkick dropped before countered?")
}
}
impl<T> Drop for Dropkick<T>
where
T: DropkickSync,
{
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
DropkickSync::dropkick(inner);
}
}
}
impl DropkickSync for ::tokio::sync::broadcast::Sender<()> {
fn dropkick(self) {
DropkickSync::dropkick(((), self));
}
}
impl<T> DropkickSync for (T, ::tokio::sync::broadcast::Sender<T>) {
fn dropkick(self) {
let (value, sender) = self;
let _ = sender.send(value);
}
}
impl DropkickSync for ::tokio::sync::mpsc::UnboundedSender<()> {
fn dropkick(self) {
DropkickSync::dropkick(((), self));
}
}
impl<T> DropkickSync for (T, ::tokio::sync::mpsc::UnboundedSender<T>) {
fn dropkick(self) {
let (value, sender) = self;
let _ = sender.send(value);
}
}
impl DropkickSync for ::tokio::sync::oneshot::Sender<()> {
fn dropkick(self) {
DropkickSync::dropkick(((), self));
}
}
impl<T> DropkickSync for (T, ::tokio::sync::oneshot::Sender<T>) {
fn dropkick(self) {
let (value, sender) = self;
let _ = sender.send(value);
}
}
impl DropkickSync for ::tokio::sync::watch::Sender<()> {
fn dropkick(self) {
DropkickSync::dropkick(((), self));
}
}
impl<T> DropkickSync for (T, ::tokio::sync::watch::Sender<T>) {
fn dropkick(self) {
let (value, sender) = self;
let _ = sender.send(value);
}
}
impl DropkickSync for ::tokio_util::sync::CancellationToken {
fn dropkick(self) {
if !self.is_cancelled() {
self.cancel()
}
}
}
impl<F, R> DropkickSync for F
where
F: FnOnce() -> R,
{
fn dropkick(self) {
(self)();
}
}
impl<T> DropkickSync for Option<T>
where
T: DropkickSync,
{
fn dropkick(mut self) {
if let Some(inner) = self.take() {
DropkickSync::dropkick(inner);
}
}
}
impl<T> DropkickSync for std::sync::Mutex<T>
where
T: DropkickSync,
{
fn dropkick(self) {
if let Ok(lock) = self.into_inner() {
DropkickSync::dropkick(lock);
}
}
}
static DROPKICK_MUTEX_OUTSTANDING_COUNT: AtomicUsize = AtomicUsize::new(0);
fn dropkick_mutex_background_task<T>(counter: &'static AtomicUsize, mutarc: Arc<Mutex<Option<T>>>)
where
T: DropkickSync,
{
tracing::trace_span!("Dropkick Mutex background cleanup").in_scope(|| {
let active_drop_count = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
tracing::trace!(
"Dropkick Mutex background cleanup thread started due to contention, active count: {}",
active_drop_count,
);
let res = mutarc.lock().expect("Dropkick Mutex poisoned").take();
<Option<T> as DropkickSync>::dropkick(res);
let active_drop_count = counter.fetch_sub(1, std::sync::atomic::Ordering::Relaxed) - 1;
tracing::trace!(
"Dropkick Mutex background cleanup thread completed, remaining active count: {}",
active_drop_count,
);
});
}
impl<T> DropkickSync for std::sync::Arc<std::sync::Mutex<Option<T>>>
where
T: Send + DropkickSync + 'static,
{
fn dropkick(self) {
let this = match Arc::try_unwrap(self) {
Ok(inner) => return <Mutex<Option<T>> as DropkickSync>::dropkick(inner),
Err(this) => this,
};
let taken = match this.try_lock() {
Ok(mut lock) => {
let taken: Option<T> = lock.take();
drop(lock);
Some(Dropkick::new(taken))
}
Err(std::sync::TryLockError::Poisoned(poisoned)) => {
panic!("Dropkick Lock was poisoned: {:?}", poisoned)
}
Err(std::sync::TryLockError::WouldBlock) => None,
};
if taken.is_none() {
tokio::task::spawn_blocking(move || {
dropkick_mutex_background_task(&DROPKICK_MUTEX_OUTSTANDING_COUNT, this)
});
}
}
}
impl<T> From<T> for Dropkick<T>
where
T: DropkickSync,
{
fn from(target: T) -> Self {
Dropkick::new(target)
}
}
#[cfg(test)]
mod tests {
use std::sync::{atomic::AtomicBool, Arc, Mutex};
use super::{Dropkick, DropkickSync};
#[repr(transparent)]
struct DropkickFlag(pub bool);
impl DropkickSync for &mut DropkickFlag {
fn dropkick(self) {
self.0 = true;
}
}
#[test]
fn dropkick_notifies() {
let mut m = DropkickFlag(false);
drop(Dropkick::new(&mut m));
assert!(
m.0,
"Dropkick must call drop_kick when allowed to drop naturally"
);
}
#[test]
fn dropkick_consumable() {
let mut m = DropkickFlag(false);
Dropkick::new(&mut m).counter();
assert!(!m.0, "Dropkick must not call drop_kick when consumed");
}
#[test]
fn dropkick_callback_notifies() {
let mut m = false;
drop(Dropkick::callback(|| m = true));
assert!(
m,
"Callback Dropkick must call drop_kick when allowed to drop naturally"
);
}
#[test]
fn dropkick_callback_consumable() {
let mut m = false;
Dropkick::callback(|| m = true).counter();
assert!(
!m,
"Callback Dropkick must not call drop_kick when consumed"
);
}
#[test]
fn dropkick_exclusive_arc_try_unwrap_optimization() {
let m = Arc::new(AtomicBool::new(false));
let target_arcmutopt: Arc<Mutex<_>> = Arc::new(Mutex::new(Some({
let m = m.clone();
move || m.store(true, std::sync::atomic::Ordering::Relaxed)
})));
let dropkick = Dropkick::new(target_arcmutopt);
assert!(
!m.load(std::sync::atomic::Ordering::Relaxed),
"Dropkick must not execute until dropped"
);
drop(dropkick);
assert!(
m.load(std::sync::atomic::Ordering::Relaxed),
"Dropkick must call drop_kick"
);
}
#[test]
fn dropkick_exclusive_mutex_try_lock_optimization() {
let m = Arc::new(AtomicBool::new(false));
let target_arcmutopt: Arc<Mutex<_>> = Arc::new(Mutex::new(Some({
let m = m.clone();
move || m.store(true, std::sync::atomic::Ordering::Relaxed)
})));
let secondary_hold: Arc<Mutex<_>> = Arc::clone(&target_arcmutopt);
let dropkick = Dropkick::new(target_arcmutopt);
assert!(
!m.load(std::sync::atomic::Ordering::Relaxed),
"Dropkick must not execute until dropped"
);
drop(dropkick);
assert!(
m.load(std::sync::atomic::Ordering::Relaxed),
"Dropkick must call drop_kick"
);
drop(secondary_hold);
}
#[tokio::test]
async fn dropkick_mutex_background_task() {
let m = Arc::new(AtomicBool::new(false));
let target_arcmutopt = Arc::new(Mutex::new(Some({
let m = m.clone();
move || m.store(true, std::sync::atomic::Ordering::Relaxed)
})));
let secondary_hold: Arc<Mutex<_>> = Arc::clone(&target_arcmutopt);
let dropkick = Dropkick::new(target_arcmutopt);
let held_lock = secondary_hold.lock().unwrap();
assert!(
!m.load(std::sync::atomic::Ordering::Relaxed),
"Dropkick must not execute until dropped"
);
drop(dropkick);
assert!(
!m.load(std::sync::atomic::Ordering::Relaxed),
"Dropkick cannot have dropped while still locked"
);
drop(held_lock);
for _ in 0..100 {
if m.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
assert!(
m.load(std::sync::atomic::Ordering::Relaxed),
"Dropkick must call drop_kick on background thread after it is unlocked"
);
}
#[test]
fn dropkick_counter_take_mutex() {
let m = Arc::new(AtomicBool::new(false));
let target_arcmutopt: Arc<Mutex<_>> = Arc::new(Mutex::new(Some({
let m = m.clone();
move || m.store(true, std::sync::atomic::Ordering::Relaxed)
})));
let dropkick = Dropkick::new(target_arcmutopt);
drop(dropkick.counter_take_mutex());
assert!(
!m.load(std::sync::atomic::Ordering::Relaxed),
"Dropkick::counter_take_mutex must not invoke dropkick event"
);
drop(dropkick);
assert!(
!m.load(std::sync::atomic::Ordering::Relaxed),
"Dropkick must not invoke counter_take_mutex-removed target"
);
}
}