#![forbid(unsafe_code)]
#![allow(clippy::needless_lifetimes)] #![allow(clippy::option_map_unit_fn)]
#![doc = include_str!("../README.md")]
use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Waker, Poll, Poll::*};
use pin_project_lite::pin_project;
#[cfg(test)]
mod test;
#[derive(Debug,Default)]
pub struct Condvar(parking_lot::Mutex<CV>);
pin_project!{
pub struct Waiter<'c,G> where G: RelockMutexGuard {
#[pin] waitstate: WaitState<'c,G>,
}}
#[derive(Debug)]
pub struct Baton<'c> {
condvar: Option<&'c Condvar>, }
impl Condvar {
pub fn new() -> Self { Default::default() }
}
pub trait RelockMutexGuard {
type MutexRef: Clone + Send;
type JustGuard;
type LockFuture: Future<Output=Self::JustGuard> + Send;
fn unlock_for_relock(self) -> Self::MutexRef;
fn lock(r: Self::MutexRef) -> Self::LockFuture;
}
#[macro_export]
macro_rules! RelockMutexGuard {
{ $struct:ident ($($guard:tt)+) $(,)? [$($mutex:tt)+] $(,)?
$l:pat => $lock:expr,
$g:pat => $get_mutex:expr
$( , where $xbound:path )* $(,)?
} => {
pub struct $struct<'l,T>(pub $($guard)+<'l,T>);
$crate::RelockMutexGuard! {
<'l,T>
( $struct<'l,T> )
[ &'l $($mutex)+ <T>, $($guard)+ <'l,T> ],
$l => $lock,
$struct($g) => $get_mutex,
where T: $( $xbound + )*
}
};
{ $struct:ident ($guard:ident $(:: $guardx:ident)*, $($mutex:tt)+) $(,)?
$l:pat => $lock:expr
$( , where $xbound:path )* $(,)?
} => {
pub struct $struct<'i,'o,T>(pub $guard $(:: $guardx)*<'i,T>,
pub &'o $($mutex)+<T>);
$crate::RelockMutexGuard! {
<'o,'i,T>
( $struct<'i,'o,T> )
[ &'o $($mutex)+ <T>, $guard $(:: $guardx)* <'o,T> ],
$l => $lock,
g => g.1,
where T: $( $xbound + )*
}
};
{ ($($guard:tt)+) $(,)? [$($mutex:tt)+] $(,)?
$l:ident => $lock:expr,
$g:ident => $get_mutex:expr
$( , where $xbound:path )* $(,)?
} => {
$crate::RelockMutexGuard! {
<'l,T>
( $($guard)* <'l,T> )
[ &'l $($mutex) +<T>, $($guard)* <'l,T> ],
$l => $lock,
$g => $get_mutex,
where T: $( $xbound + )*
}
};
{ ($guard:ident $(:: $guardx:ident)*, $($mutex:tt)+) $(,)?
$l:ident => $lock:expr
$( , where $xbound:path )* $(,)?
} => {
$crate::RelockMutexGuard! {
<'o,'i,T>
( ($guard $(:: $guardx)* <'i,T>, &'o $($mutex)+ <T>) )
[ &'o $($mutex)+ <T>, $guard $(:: $guardx)* <'o,T> ],
$l => $lock,
g => g.1,
where T: $( $xbound + )*
}
};
{ < $($gen_lf0:lifetime, $($gen_lf1:lifetime,)*)? $($gen_ty:ident),* > $(,)?
( $guard_in:ty ) $(,)?
[ $mutexref:ty, $guard_out:path ] $(,)?
$l:pat => $lock:expr ,
$g:pat => $get_mutex:expr ,
where $t:ident : $($bound:tt)*
} => {
impl < $($gen_lf0, $($gen_lf1,)*)? $($gen_ty),* >
$crate::RelockMutexGuard for $guard_in
where $t : std::marker::Send + $($bound)*
{
type MutexRef = $mutexref;
type JustGuard = $guard_out;
type LockFuture = std::pin::Pin<std::boxed::Box<
dyn std::future::Future<Output=Self::JustGuard>
+ std::marker::Send $(+ $gen_lf0)?
>>;
fn unlock_for_relock(self) -> Self::MutexRef {
let $g = self;
$get_mutex
}
fn lock($l: Self::MutexRef) -> Self::LockFuture {
std::boxed::Box::pin($lock)
}
}
}
}
RelockMutexGuard!{
(std::sync::MutexGuard, std::sync::Mutex),
l => async move { l.lock().unwrap() },
}
impl<G> RelockMutexGuard for NotRelockable<G> {
type MutexRef = ();
type JustGuard = ();
type LockFuture = std::future::Ready<()>;
fn unlock_for_relock(self) -> Self::MutexRef { }
fn lock(_l: ()) -> Self::LockFuture { std::future::ready(()) }
}
macro_rules! impl_parking_lot { {
$feat:literal, $parking_lot:ident,
$( $FairMutex:ident, )?
} => {
#[cfg(feature=$feat)]
RelockMutexGuard!{
($parking_lot::MutexGuard) [$parking_lot::Mutex],
l => async move { l.lock() },
g => $parking_lot::lock_api::MutexGuard::mutex(&g),
}
$(
#[cfg(feature=$feat)]
RelockMutexGuard!{
($parking_lot::FairMutexGuard) [$parking_lot::$FairMutex],
l => async move { l.lock() },
g => $parking_lot::lock_api::MutexGuard::mutex(&g),
}
)?
} }
impl_parking_lot!{ "parking_lot_0_12", parking_lot , FairMutex, }
impl_parking_lot!{ "parking_lot_0_11", parking_lot_0_11, FairMutex, }
impl_parking_lot!{ "parking_lot_0_10", parking_lot_0_10, FairMutex, }
impl_parking_lot!{ "parking_lot_0_9", parking_lot_0_9, }
#[cfg(feature="tokio")]
RelockMutexGuard!{
(tokio::sync::MutexGuard, tokio::sync::Mutex),
l => l.lock(),
}
#[cfg(feature="tokio")]
RelockMutexGuard!{
(tokio::sync::RwLockReadGuard, tokio::sync::RwLock)
l => l.read(),
where Sync
}
#[cfg(feature="tokio")]
RelockMutexGuard!{
(tokio::sync::RwLockWriteGuard, tokio::sync::RwLock),
l => l.write(),
where Sync
}
#[cfg(feature="tokio")]
RelockMutexGuard!{
<T>
( (tokio::sync::OwnedMutexGuard<T>,
std::sync::Arc<tokio::sync::Mutex<T>>) )
[ std::sync::Arc<tokio::sync::Mutex<T>>,
tokio::sync::OwnedMutexGuard<T> ],
l => async move { l.lock_owned().await },
g => g.1,
where T: 'static
}
#[cfg(feature="tokio")]
RelockMutexGuard!{
<T>
( (tokio::sync::OwnedRwLockReadGuard<T>,
std::sync::Arc<tokio::sync::RwLock<T>>) )
[ std::sync::Arc<tokio::sync::RwLock<T>>,
tokio::sync::OwnedRwLockReadGuard<T> ],
l => async move { l.read_owned().await },
g => g.1,
where T: Sync + 'static
}
#[cfg(feature="tokio")]
RelockMutexGuard!{
<T>
( (tokio::sync::OwnedRwLockWriteGuard<T>,
std::sync::Arc<tokio::sync::RwLock<T>>) )
[ std::sync::Arc<tokio::sync::RwLock<T>>,
tokio::sync::OwnedRwLockWriteGuard<T> ],
l => async move { l.write_owned().await },
g => g.1,
where T: Sync + 'static
}
#[cfg(feature="smol")]
RelockMutexGuard!{
(smol::lock::MutexGuard) [smol::lock::Mutex],
l => l.lock(),
g => smol::lock::MutexGuard::source(&g),
}
#[cfg(feature="smol")]
RelockMutexGuard!{
(smol::lock::RwLockReadGuard, smol::lock::RwLock)
l => l.read(),
where Sync
}
#[cfg(feature="smol")]
RelockMutexGuard!{
<T>
(smol::lock::MutexGuardArc<T>)
[std::sync::Arc<smol::lock::Mutex<T>>, smol::lock::MutexGuardArc<T>],
l => async move { l.lock_arc().await },
g => smol::lock::MutexGuardArc::source(&g).clone(),
where T: 'static
}
#[derive(Debug,Default)]
struct CV {
list: dlv_list::VecList<Entry>,
}
type I = dlv_list::Index<Entry>;
#[derive(Debug)]
enum Entry {
Waiting(Option<Waker>),
Signaled, Broadcasted,
}
use Entry::*;
pin_project! {
#[project=WSProj]
enum WaitState<'c,G> where G: RelockMutexGuard {
Waiting {
ns: WS_Waiting<'c,G>
},
Locking {
ns: WS_Locking_NS<'c>, #[pin] locking: G::LockFuture,
},
Ended,
}}
type WS<'c,G> = WaitState<'c,G>;
#[derive(Debug)]
#[allow(non_camel_case_types)]
struct WS_Waiting<'c,G> where G: RelockMutexGuard {
condvar: &'c Condvar,
ent: Option<I>, lock: G::MutexRef,
}
#[derive(Debug)]
#[allow(non_camel_case_types)]
struct WS_Locking_NS<'c> {
condvar: &'c Condvar,
baton: bool,
}
impl Condvar {
pub fn wait_baton<'c,G>(&'c self, guard: G) -> Waiter<'c,G>
where G: RelockMutexGuard
{
let mut cv = self.0.lock();
let ent = cv.list.push_back(Waiting(None));
let lock = RelockMutexGuard::unlock_for_relock(guard);
Waiter { waitstate: WS::Waiting { ns: WS_Waiting {
condvar: self,
ent: Some(ent),
lock,
} } }
}
pub async fn wait<'c,G>(&'c self, guard: G) -> G::JustGuard
where G: RelockMutexGuard
{
let (guard, baton) = self.wait_baton(guard).await;
baton.dispose();
guard
}
pub async fn wait_no_relock<'c,G>(&'c self, guard: G) -> Option<Baton<'c>> {
let (_guard, baton) = self.wait_baton(NotRelockable(guard)).await;
baton
}
}
struct NotRelockable<G>(G);
impl<'c,G> Future for Waiter<'c,G> where G: RelockMutexGuard {
type Output = (G::JustGuard, Option<Baton<'c>>);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Self::Output>
{
loop {
match self.as_mut().project().waitstate.project() {
WSProj::Ended => panic!(),
WSProj::Waiting { ns: WS_Waiting { condvar, ent, lock } } => {
let condvar = *condvar;
let mut cv = condvar.0.lock();
let entry = &mut cv.list.get_mut(ent.unwrap()).unwrap();
let baton = match entry {
Signaled => true,
Broadcasted => false,
Waiting(waker) => {
*waker = Some(cx.waker().clone());
return Pending;
}
};
cv.list.remove(ent.take().unwrap());
let locking = <G as RelockMutexGuard>::lock(lock.clone());
self.as_mut().set(Waiter { waitstate: { WS::Locking {
ns: WS_Locking_NS { condvar, baton },
locking,
} } } );
},
WSProj::Locking { ns: WS_Locking_NS { condvar, baton }, locking } => {
let guard = match locking.poll(cx) {
Pending => return Pending,
Ready(guard) => guard,
};
let rbaton = condvar.baton_from_bool(*baton);
*baton = false;
self.as_mut().set(Waiter { waitstate: WS::Ended });
return Ready((guard, rbaton))
}
}
}
}
}
impl Condvar {
pub fn notify_one(&self) {
self.0.lock().notify_one()
}
}
impl CV {
fn notify_one(&mut self) {
if let Some(entry) = self.list.front_mut() {
match entry {
Signaled | Broadcasted => { }, Waiting(waker) => {
if let Some(waker) = waker.take() { waker.wake() }
*entry = Signaled;
},
};
}
}
}
impl Condvar {
pub fn notify_all(&self) {
let mut cv = self.0.lock();
for entry in cv.list.iter_mut() {
match entry {
Signaled | Broadcasted => {
*entry = Broadcasted; },
Waiting(waker) => {
if let Some(waker) = waker.take() { waker.wake() }
*entry = Broadcasted;
},
};
}
}
}
impl Condvar {
fn baton_from_bool<'c>(&'c self, yes: bool) -> Option<Baton<'c>> {
if yes {
Some(self.make_baton())
} else {
None
}
}
pub fn make_baton<'c>(&'c self) -> Baton<'c> {
Baton { condvar: Some(self) }
}
}
impl Baton<'_> {
pub fn dispose(mut self) { let _ = self.condvar.take(); }
pub fn pass(self) { }
}
pub trait BatonExt: Sized {
fn dispose(self);
fn pass(self) { }
}
impl BatonExt for Option<Baton<'_>> {
fn dispose(self) { self.map(Baton::dispose); }
}
impl<G> Drop for WS_Waiting<'_,G> where G: RelockMutexGuard {
fn drop(&mut self) {
if let Some(ent) = self.ent.take() {
let mut cv = self.condvar.0.lock();
cv.list.remove(ent);
}
}
}
impl Drop for WS_Locking_NS<'_> {
fn drop(&mut self) {
let _baton = self.condvar.baton_from_bool(self.baton);
}
}
impl Drop for Baton<'_> {
fn drop(&mut self) {
if let Some(condvar) = self.condvar.take() {
condvar.notify_one();
}
}
}