spawned_rt/threads/
mod.rs1pub mod mpsc;
4pub mod oneshot;
5
6use std::sync::{
7 atomic::{AtomicBool, Ordering},
8 mpsc as std_mpsc, Arc, Mutex, OnceLock,
9};
10pub use std::{
11 future::Future,
12 thread::{sleep, spawn, JoinHandle},
13};
14
15use crate::{tasks::Runtime, tracing::init_tracing};
16
17static CTRL_C_SUBSCRIBERS: OnceLock<Mutex<Vec<std_mpsc::Sender<()>>>> = OnceLock::new();
19
20pub fn run(f: fn()) {
22 init_tracing();
23
24 f()
25}
26
27pub fn block_on<F: Future>(future: F) -> F::Output {
29 let rt = Runtime::new().unwrap();
30 rt.block_on(future)
31}
32
33pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R>
35where
36 F: FnOnce() -> R + Send + 'static,
37 R: Send + 'static,
38{
39 spawn(f)
40}
41
42type CancelCallback = Box<dyn FnOnce() + Send>;
43
44#[derive(Clone, Default)]
49pub struct CancellationToken {
50 is_cancelled: Arc<AtomicBool>,
51 callbacks: Arc<Mutex<Vec<CancelCallback>>>,
52}
53
54impl std::fmt::Debug for CancellationToken {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 f.debug_struct("CancellationToken")
57 .field("is_cancelled", &self.is_cancelled())
58 .finish()
59 }
60}
61
62impl CancellationToken {
63 pub fn new() -> Self {
64 CancellationToken {
65 is_cancelled: Arc::new(false.into()),
66 callbacks: Arc::new(Mutex::new(Vec::new())),
67 }
68 }
69
70 pub fn is_cancelled(&self) -> bool {
71 self.is_cancelled.load(Ordering::SeqCst)
72 }
73
74 pub fn cancel(&self) {
75 self.is_cancelled.store(true, Ordering::SeqCst);
76 let callbacks: Vec<_> = self
78 .callbacks
79 .lock()
80 .unwrap_or_else(|e| e.into_inner())
81 .drain(..)
82 .collect();
83 for cb in callbacks {
84 cb();
85 }
86 }
87
88 pub fn on_cancel(&self, callback: CancelCallback) {
94 let mut callbacks = self.callbacks.lock().unwrap_or_else(|e| e.into_inner());
99 if self.is_cancelled() {
100 drop(callbacks);
101 callback();
102 } else {
103 callbacks.push(callback);
104 }
105 }
106}
107
108pub fn ctrl_c() -> impl FnOnce() + Send + 'static {
125 let subscribers = CTRL_C_SUBSCRIBERS.get_or_init(|| {
127 ctrlc::set_handler(|| {
128 if let Some(subs) = CTRL_C_SUBSCRIBERS.get() {
129 let mut guard = subs.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
130 guard.retain(|tx| tx.send(()).is_ok());
132 }
133 })
134 .expect("Ctrl+C handler already set. Use ctrl_c() instead of ctrlc::set_handler()");
135 Mutex::new(Vec::new())
136 });
137
138 let (tx, rx) = std_mpsc::channel();
140 subscribers
141 .lock()
142 .unwrap_or_else(|poisoned| poisoned.into_inner())
143 .push(tx);
144
145 move || {
146 let _ = rx.recv();
147 }
148}