Skip to main content

spawned_rt/threads/
mod.rs

1//! IO-threads based module to support shared behavior with task based version.
2
3pub mod mpsc;
4pub mod oneshot;
5
6use std::sync::{
7    atomic::{AtomicBool, Ordering},
8    mpsc as std_mpsc, Arc, Mutex, OnceLock,
9};
10pub use std::{
11    future::Future,
12    thread::{sleep, spawn, JoinHandle},
13};
14
15use crate::{tasks::Runtime, tracing::init_tracing};
16
17/// Global list of Ctrl+C subscribers
18static CTRL_C_SUBSCRIBERS: OnceLock<Mutex<Vec<std_mpsc::Sender<()>>>> = OnceLock::new();
19
20/// Initialize tracing and run the given function.
21pub fn run(f: fn()) {
22    init_tracing();
23
24    f()
25}
26
27/// Create a temporary tokio runtime and block on the given future.
28pub fn block_on<F: Future>(future: F) -> F::Output {
29    let rt = Runtime::new().unwrap();
30    rt.block_on(future)
31}
32
33/// Spawn blocking is the same as spawn for pure threaded usage.
34pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R>
35where
36    F: FnOnce() -> R + Send + 'static,
37    R: Send + 'static,
38{
39    spawn(f)
40}
41
42type CancelCallback = Box<dyn FnOnce() + Send>;
43
44/// A token that can be used to signal cancellation.
45///
46/// Supports registering callbacks via `on_cancel()` that fire when
47/// the token is cancelled, enabling efficient waiting patterns.
48#[derive(Clone, Default)]
49pub struct CancellationToken {
50    is_cancelled: Arc<AtomicBool>,
51    callbacks: Arc<Mutex<Vec<CancelCallback>>>,
52}
53
54impl std::fmt::Debug for CancellationToken {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct("CancellationToken")
57            .field("is_cancelled", &self.is_cancelled())
58            .finish()
59    }
60}
61
62impl CancellationToken {
63    pub fn new() -> Self {
64        CancellationToken {
65            is_cancelled: Arc::new(false.into()),
66            callbacks: Arc::new(Mutex::new(Vec::new())),
67        }
68    }
69
70    pub fn is_cancelled(&self) -> bool {
71        self.is_cancelled.load(Ordering::SeqCst)
72    }
73
74    pub fn cancel(&self) {
75        self.is_cancelled.store(true, Ordering::SeqCst);
76        // Fire all registered callbacks
77        let callbacks: Vec<_> = self
78            .callbacks
79            .lock()
80            .unwrap_or_else(|e| e.into_inner())
81            .drain(..)
82            .collect();
83        for cb in callbacks {
84            cb();
85        }
86    }
87
88    /// Register a callback to be invoked when this token is cancelled.
89    /// If already cancelled, the callback fires immediately.
90    ///
91    /// This method is thread-safe: the callback is guaranteed to fire exactly
92    /// once, either immediately (if already cancelled) or when `cancel()` is called.
93    pub fn on_cancel(&self, callback: CancelCallback) {
94        // Hold the lock while checking is_cancelled to avoid a race with cancel().
95        // cancel() sets the flag BEFORE acquiring the lock, so if we see
96        // is_cancelled=false while holding the lock, cancel() hasn't drained
97        // callbacks yet and will drain ours after we release the lock.
98        let mut callbacks = self.callbacks.lock().unwrap_or_else(|e| e.into_inner());
99        if self.is_cancelled() {
100            drop(callbacks);
101            callback();
102        } else {
103            callbacks.push(callback);
104        }
105    }
106}
107
108/// Returns a closure that blocks until Ctrl+C is received.
109///
110/// Multiple calls to this function are supported - each returns a closure that
111/// will be notified when Ctrl+C is pressed. This allows multiple actors to
112/// react to the same signal.
113///
114/// The signal handler is registered on the first call. Subsequent calls simply
115/// add new subscribers to the broadcast list.
116///
117/// # Example
118///
119/// ```ignore
120/// // Both actors will be notified on Ctrl+C
121/// send_message_on(actor1.clone(), rt::ctrl_c(), Msg::Shutdown);
122/// send_message_on(actor2.clone(), rt::ctrl_c(), Msg::Shutdown);
123/// ```
124pub fn ctrl_c() -> impl FnOnce() + Send + 'static {
125    // Initialize subscribers list and register handler on first call
126    let subscribers = CTRL_C_SUBSCRIBERS.get_or_init(|| {
127        ctrlc::set_handler(|| {
128            if let Some(subs) = CTRL_C_SUBSCRIBERS.get() {
129                let mut guard = subs.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
130                // Notify all subscribers and remove dead ones (where receiver was dropped)
131                guard.retain(|tx| tx.send(()).is_ok());
132            }
133        })
134        .expect("Ctrl+C handler already set. Use ctrl_c() instead of ctrlc::set_handler()");
135        Mutex::new(Vec::new())
136    });
137
138    // Create a new subscriber channel
139    let (tx, rx) = std_mpsc::channel();
140    subscribers
141        .lock()
142        .unwrap_or_else(|poisoned| poisoned.into_inner())
143        .push(tx);
144
145    move || {
146        let _ = rx.recv();
147    }
148}