#![forbid(unsafe_code)]
use core::fmt::{Debug, Formatter};
use core::hash::{Hash, Hasher};
use core::pin::Pin;
use core::sync::atomic::AtomicBool;
use core::task::{Context, Poll, Waker};
use core::time::Duration;
use std::collections::HashSet;
use std::future::Future;
use std::sync::{Arc, Condvar, Mutex, Weak};
use std::time::Instant;
#[derive(Clone)]
struct ArcNode(Arc<Node>);
impl PartialEq for ArcNode {
fn eq(&self, other: &Self) -> bool {
Arc::as_ptr(&self.0).eq(&Arc::as_ptr(&other.0))
}
}
impl Eq for ArcNode {}
impl Hash for ArcNode {
fn hash<H: Hasher>(&self, state: &mut H) {
Arc::as_ptr(&self.0).hash(state);
}
}
struct Inner {
revoked: bool,
opt_waker: Option<Waker>,
subs: HashSet<ArcNode>,
}
impl Inner {
#[must_use]
pub fn new(revoked: bool) -> Self {
Inner {
revoked,
opt_waker: None,
subs: HashSet::new(),
}
}
pub fn add_sub(&mut self, node: &Arc<Node>) {
if !self.revoked {
self.subs.insert(ArcNode(Arc::clone(node)));
}
}
pub fn remove_sub(&mut self, node: &Arc<Node>) {
let arc_node = ArcNode(Arc::clone(node));
self.subs.remove(&arc_node);
}
pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> {
if self.revoked {
Poll::Ready(())
} else {
self.opt_waker = Some(cx.waker().clone());
Poll::Pending
}
}
pub fn revoke(&mut self) -> (Option<Waker>, Vec<ArcNode>) {
self.revoked = true;
(self.opt_waker.take(), self.subs.iter().cloned().collect())
}
}
struct Node {
superior: Weak<Node>,
atomic_revoked: AtomicBool,
inner: Mutex<Inner>,
condvar: Condvar,
}
impl Node {
#[must_use]
pub fn new(revoked: bool, superior: Weak<Self>) -> Self {
Node {
superior,
atomic_revoked: AtomicBool::new(revoked),
inner: Mutex::new(Inner::new(revoked)),
condvar: Condvar::new(),
}
}
#[must_use]
pub fn new_apex() -> Self {
Self::new(false, Weak::new())
}
#[must_use]
pub fn new_sub(self: &Arc<Self>) -> Arc<Self> {
let node = Arc::new(Self::new(self.is_revoked(), Arc::downgrade(self)));
self.inner.lock().unwrap().add_sub(&node);
node
}
#[must_use]
pub fn new_clone(self: &Arc<Self>) -> Arc<Self> {
let node = Arc::new(Self::new(self.is_revoked(), Weak::clone(&self.superior)));
if let Some(superior) = self.superior.upgrade() {
superior.add_sub(&node);
}
node
}
pub fn add_sub(self: &Arc<Self>, node: &Arc<Node>) {
self.inner.lock().unwrap().add_sub(node);
}
fn remove_sub(&self, node: &Arc<Node>) {
self.inner.lock().unwrap().remove_sub(node);
self.condvar.notify_all();
}
#[must_use]
pub fn is_revoked(&self) -> bool {
self.atomic_revoked
.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn poll(self: &Arc<Self>, cx: &mut Context<'_>) -> Poll<()> {
self.inner.lock().unwrap().poll(cx)
}
fn revoke(self: &Arc<Self>, wake: bool) {
self.atomic_revoked
.store(true, std::sync::atomic::Ordering::Relaxed);
let (opt_waker, subs) = self.inner.lock().unwrap().revoke();
self.condvar.notify_all();
if wake {
if let Some(waker) = opt_waker {
waker.wake();
}
}
for sub in subs {
sub.0.revoke(true);
}
}
pub fn revoke_and_remove_from_superior(self: &Arc<Self>) {
if let Some(superior) = self.superior.upgrade() {
superior.remove_sub(self);
}
self.revoke(false);
}
}
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct DeadlineExceeded;
impl core::fmt::Display for DeadlineExceeded {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), core::fmt::Error> {
write!(f, "DeadlineExceeded")
}
}
impl std::error::Error for DeadlineExceeded {}
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct PermitRevoked;
impl core::fmt::Display for PermitRevoked {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), core::fmt::Error> {
write!(f, "PermitRevoked")
}
}
impl std::error::Error for PermitRevoked {}
pub struct Permit {
node: Arc<Node>,
}
impl Permit {
#[must_use]
pub fn new() -> Self {
Self {
node: Arc::new(Node::new_apex()),
}
}
#[must_use]
pub fn new_sub(&self) -> Self {
Self {
node: self.node.new_sub(),
}
}
#[must_use]
pub fn is_revoked(&self) -> bool {
self.node.is_revoked()
}
#[must_use]
pub fn ok(&self) -> Option<()> {
if self.node.is_revoked() {
None
} else {
Some(())
}
}
#[allow(clippy::must_use_candidate)]
pub fn revoke(&self) -> &Self {
self.node.revoke_and_remove_from_superior();
self
}
#[allow(clippy::missing_panics_doc)]
#[must_use]
pub fn has_subs(&self) -> bool {
!self.node.inner.lock().unwrap().subs.is_empty()
}
#[allow(clippy::missing_panics_doc)]
pub fn sleep(&self, duration: Duration) -> Result<(), PermitRevoked> {
let inner_guard = self.node.inner.lock().unwrap();
let (_guard, wait_result) = self
.node
.condvar
.wait_timeout_while(inner_guard, duration, |inner_guard| !inner_guard.revoked)
.unwrap();
if wait_result.timed_out() {
Ok(())
} else {
Err(PermitRevoked {})
}
}
pub fn sleep_until(&self, deadline: Instant) -> Result<(), PermitRevoked> {
let duration = deadline.saturating_duration_since(Instant::now());
self.sleep(duration)
}
#[allow(clippy::missing_panics_doc)]
pub fn wait_subs_timeout(&self, duration: Duration) -> Result<(), DeadlineExceeded> {
let guard = self.node.inner.lock().unwrap();
let (_guard, wait_result) = self
.node
.condvar
.wait_timeout_while(guard, duration, |guard| !guard.subs.is_empty())
.unwrap();
if wait_result.timed_out() {
Err(DeadlineExceeded {})
} else {
Ok(())
}
}
pub fn wait_subs_deadline(&self, deadline: Instant) -> Result<(), DeadlineExceeded> {
let duration = deadline.saturating_duration_since(Instant::now());
self.wait_subs_timeout(duration)
}
}
impl Drop for Permit {
fn drop(&mut self) {
self.node.revoke_and_remove_from_superior();
}
}
impl Clone for Permit {
fn clone(&self) -> Self {
Self {
node: self.node.new_clone(),
}
}
}
impl Debug for Permit {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), core::fmt::Error> {
write!(
f,
"Permit{{revoked={},num_subs={}}}",
self.is_revoked(),
Arc::weak_count(&self.node)
)
}
}
impl Default for Permit {
fn default() -> Self {
Self::new()
}
}
impl Future for Permit {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.node.poll(cx)
}
}