ckb_stop_handler/
stop_register.rs1use ckb_channel::TrySendError;
2use ckb_logger::{debug, info, trace, warn};
3use ckb_util::Mutex;
4use std::sync::atomic::AtomicBool;
5use tokio_util::sync::CancellationToken;
6
7struct CkbServiceHandles {
8 thread_handles: Vec<(String, std::thread::JoinHandle<()>)>,
9}
10
11pub fn wait_all_ckb_services_exit() {
13 info!("Waiting exit signal...");
14 let exit_signal = new_crossbeam_exit_rx();
15 let _ = exit_signal.recv();
16 let mut handles = CKB_HANDLES.lock();
17 debug!("wait_all_ckb_services_exit waiting all threads to exit");
18 for (name, join_handle) in handles.thread_handles.drain(..) {
19 match join_handle.join() {
20 Ok(_) => {
21 info!("Waiting thread {} done.", name);
22 }
23 Err(e) => {
24 warn!("Waiting thread {}: ERROR: {:?}", name, e)
25 }
26 }
27 }
28 info!("All ckb threads have been stopped");
29}
30
31static CKB_HANDLES: std::sync::LazyLock<Mutex<CkbServiceHandles>> =
32 std::sync::LazyLock::new(|| {
33 Mutex::new(CkbServiceHandles {
34 thread_handles: vec![],
35 })
36 });
37
38static RECEIVED_STOP_SIGNAL: std::sync::LazyLock<AtomicBool> =
39 std::sync::LazyLock::new(AtomicBool::default);
40
41static TOKIO_EXIT: std::sync::LazyLock<CancellationToken> =
42 std::sync::LazyLock::new(CancellationToken::new);
43
44static CROSSBEAM_EXIT_SENDERS: std::sync::LazyLock<Mutex<Vec<ckb_channel::Sender<()>>>> =
45 std::sync::LazyLock::new(|| Mutex::new(vec![]));
46
47pub fn new_tokio_exit_rx() -> CancellationToken {
49 TOKIO_EXIT.clone()
50}
51
52pub fn new_crossbeam_exit_rx() -> ckb_channel::Receiver<()> {
54 let (tx, rx) = ckb_channel::bounded(1);
55 CROSSBEAM_EXIT_SENDERS.lock().push(tx);
56 rx
57}
58
59pub fn has_received_stop_signal() -> bool {
61 RECEIVED_STOP_SIGNAL.load(std::sync::atomic::Ordering::SeqCst)
62}
63
64pub fn broadcast_exit_signals() {
66 debug!("Received exit signal; broadcasting exit signal to all threads");
67 RECEIVED_STOP_SIGNAL.store(true, std::sync::atomic::Ordering::SeqCst);
68 TOKIO_EXIT.cancel();
69 CROSSBEAM_EXIT_SENDERS
70 .lock()
71 .iter()
72 .for_each(|tx| match tx.try_send(()) {
73 Ok(_) => {}
74 Err(TrySendError::Full(_)) => info!("Ckb process has received exit signal"),
75 Err(TrySendError::Disconnected(_)) => {
76 debug!("broadcast thread: channel is disconnected")
77 }
78 });
79}
80
81pub fn register_thread(name: &str, thread_handle: std::thread::JoinHandle<()>) {
83 trace!("Registering thread {}", name);
84 CKB_HANDLES
85 .lock()
86 .thread_handles
87 .push((name.into(), thread_handle));
88 trace!("Thread registration completed {}", name);
89}