use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::{Notify, watch};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
pub fn get_or_receiver<T: Send + Sync + Clone + 'static>(
sender: &watch::Sender<Option<T>>,
token: CancellationToken,
) -> Result<T, JoinHandle<Option<T>>> {
if let Some(value) = sender.borrow().as_ref() {
Ok(value.clone())
} else {
let mut rx = sender.subscribe();
Err(tokio::spawn(async move {
tokio::select! {
result = rx.wait_for(|v| v.is_some()) => {
result.ok().and_then(|v| v.clone())
}
_ = token.cancelled() => None,
}
}))
}
}
pub async fn get_value<T: Send + Sync + Clone + 'static>(
sender: &watch::Sender<Option<T>>,
token: CancellationToken,
) -> Option<T> {
match get_or_receiver(sender, token) {
Ok(value) => Some(value),
Err(handle) => handle.await.unwrap_or(None),
}
}
pub fn check_or_receiver<T: Send + Sync + Clone + 'static>(
sender: &watch::Sender<Option<T>>,
token: CancellationToken,
) -> Result<(), JoinHandle<bool>> {
if sender.borrow().as_ref().is_some() {
Ok(())
} else {
let mut rx = sender.subscribe();
Err(tokio::spawn(async move {
tokio::select! {
result = rx.wait_for(|v| v.is_some()) => result.is_ok(),
_ = token.cancelled() => false,
}
}))
}
}
pub struct TaskCounter {
count: AtomicUsize,
notify: Notify,
}
impl TaskCounter {
pub fn new() -> Arc<Self> {
Arc::new(Self {
count: AtomicUsize::new(0),
notify: Notify::new(),
})
}
pub async fn wait_for_zero(self: &Arc<Self>) {
loop {
let notified = self.notify.notified();
if self.count.load(Ordering::Acquire) == 0 {
return;
}
notified.await;
}
}
pub fn guard(self: &Arc<Self>) -> TaskGuard {
self.count.fetch_add(1, Ordering::Relaxed);
TaskGuard {
counter: Arc::clone(self),
}
}
}
pub struct TaskGuard {
counter: Arc<TaskCounter>,
}
impl Drop for TaskGuard {
fn drop(&mut self) {
if self.counter.count.fetch_sub(1, Ordering::Release) == 1 {
self.counter.notify.notify_waiters();
}
}
}