pub mod mpsc;
pub mod oneshot;
use std::sync::{
atomic::{AtomicBool, Ordering},
mpsc as std_mpsc, Arc, Mutex, OnceLock,
};
pub use std::{
future::Future,
thread::{sleep, spawn, JoinHandle},
};
use crate::{tasks::Runtime, tracing::init_tracing};
static CTRL_C_SUBSCRIBERS: OnceLock<Mutex<Vec<std_mpsc::Sender<()>>>> = OnceLock::new();
pub fn run(f: fn()) {
init_tracing();
f()
}
pub fn block_on<F: Future>(future: F) -> F::Output {
let rt = Runtime::new().unwrap();
rt.block_on(future)
}
pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
spawn(f)
}
type CancelCallback = Box<dyn FnOnce() + Send>;
#[derive(Clone, Default)]
pub struct CancellationToken {
is_cancelled: Arc<AtomicBool>,
callbacks: Arc<Mutex<Vec<CancelCallback>>>,
}
impl std::fmt::Debug for CancellationToken {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CancellationToken")
.field("is_cancelled", &self.is_cancelled())
.finish()
}
}
impl CancellationToken {
pub fn new() -> Self {
CancellationToken {
is_cancelled: Arc::new(false.into()),
callbacks: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn is_cancelled(&self) -> bool {
self.is_cancelled.load(Ordering::SeqCst)
}
pub fn cancel(&self) {
self.is_cancelled.store(true, Ordering::SeqCst);
let callbacks: Vec<_> = self
.callbacks
.lock()
.unwrap_or_else(|e| e.into_inner())
.drain(..)
.collect();
for cb in callbacks {
cb();
}
}
pub fn on_cancel(&self, callback: CancelCallback) {
let mut callbacks = self.callbacks.lock().unwrap_or_else(|e| e.into_inner());
if self.is_cancelled() {
drop(callbacks);
callback();
} else {
callbacks.push(callback);
}
}
}
pub fn ctrl_c() -> impl FnOnce() + Send + 'static {
let subscribers = CTRL_C_SUBSCRIBERS.get_or_init(|| {
ctrlc::set_handler(|| {
if let Some(subs) = CTRL_C_SUBSCRIBERS.get() {
let mut guard = subs.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
guard.retain(|tx| tx.send(()).is_ok());
}
})
.expect("Ctrl+C handler already set. Use ctrl_c() instead of ctrlc::set_handler()");
Mutex::new(Vec::new())
});
let (tx, rx) = std_mpsc::channel();
subscribers
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
.push(tx);
move || {
let _ = rx.recv();
}
}