ckb_stop_handler/
stop_register.rs

1use ckb_channel::TrySendError;
2use ckb_logger::{debug, info, trace, warn};
3use ckb_util::Mutex;
4use std::sync::atomic::AtomicBool;
5use tokio_util::sync::CancellationToken;
6
7struct CkbServiceHandles {
8    thread_handles: Vec<(String, std::thread::JoinHandle<()>)>,
9}
10
11/// Wait all ckb services exit
12pub fn wait_all_ckb_services_exit() {
13    info!("Waiting exit signal...");
14    let exit_signal = new_crossbeam_exit_rx();
15    let _ = exit_signal.recv();
16    let mut handles = CKB_HANDLES.lock();
17    debug!("wait_all_ckb_services_exit waiting all threads to exit");
18    for (name, join_handle) in handles.thread_handles.drain(..) {
19        match join_handle.join() {
20            Ok(_) => {
21                info!("Waiting thread {} done.", name);
22            }
23            Err(e) => {
24                warn!("Waiting thread {}: ERROR: {:?}", name, e)
25            }
26        }
27    }
28    info!("All ckb threads have been stopped");
29}
30
31static CKB_HANDLES: std::sync::LazyLock<Mutex<CkbServiceHandles>> =
32    std::sync::LazyLock::new(|| {
33        Mutex::new(CkbServiceHandles {
34            thread_handles: vec![],
35        })
36    });
37
38static RECEIVED_STOP_SIGNAL: std::sync::LazyLock<AtomicBool> =
39    std::sync::LazyLock::new(AtomicBool::default);
40
41static TOKIO_EXIT: std::sync::LazyLock<CancellationToken> =
42    std::sync::LazyLock::new(CancellationToken::new);
43
44static CROSSBEAM_EXIT_SENDERS: std::sync::LazyLock<Mutex<Vec<ckb_channel::Sender<()>>>> =
45    std::sync::LazyLock::new(|| Mutex::new(vec![]));
46
47/// Create a new CancellationToken for exit signal
48pub fn new_tokio_exit_rx() -> CancellationToken {
49    TOKIO_EXIT.clone()
50}
51
52/// Create a new crossbeam Receiver for exit signal
53pub fn new_crossbeam_exit_rx() -> ckb_channel::Receiver<()> {
54    let (tx, rx) = ckb_channel::bounded(1);
55    CROSSBEAM_EXIT_SENDERS.lock().push(tx);
56    rx
57}
58
59/// Check if the ckb process has received stop signal
60pub fn has_received_stop_signal() -> bool {
61    RECEIVED_STOP_SIGNAL.load(std::sync::atomic::Ordering::SeqCst)
62}
63
64/// Broadcast exit signals to all threads and all tokio tasks
65pub fn broadcast_exit_signals() {
66    debug!("Received exit signal; broadcasting exit signal to all threads");
67    RECEIVED_STOP_SIGNAL.store(true, std::sync::atomic::Ordering::SeqCst);
68    TOKIO_EXIT.cancel();
69    CROSSBEAM_EXIT_SENDERS
70        .lock()
71        .iter()
72        .for_each(|tx| match tx.try_send(()) {
73            Ok(_) => {}
74            Err(TrySendError::Full(_)) => info!("Ckb process has received exit signal"),
75            Err(TrySendError::Disconnected(_)) => {
76                debug!("broadcast thread: channel is disconnected")
77            }
78        });
79}
80
81/// Register a thread `JoinHandle` to `CKB_HANDLES`
82pub fn register_thread(name: &str, thread_handle: std::thread::JoinHandle<()>) {
83    trace!("Registering thread {}", name);
84    CKB_HANDLES
85        .lock()
86        .thread_handles
87        .push((name.into(), thread_handle));
88    trace!("Thread registration completed {}", name);
89}