ckb_async_runtime/
native.rs1use ckb_spawn::Spawn;
2use core::future::Future;
3use std::sync::atomic::{AtomicU32, Ordering};
4use std::thread::available_parallelism;
5use tokio::runtime::{Builder, Handle as TokioHandle, Runtime};
6
7use tokio::sync::mpsc::{Receiver, Sender};
8use tokio::task::JoinHandle;
9
10#[derive(Debug, Clone)]
15pub struct Handle {
16 pub(crate) inner: TokioHandle,
17 guard: Option<Sender<()>>,
18}
19
20impl Handle {
21 pub fn new(inner: TokioHandle, guard: Option<Sender<()>>) -> Self {
23 Self { inner, guard }
24 }
25
26 pub fn drop_guard(&mut self) {
28 let _ = self.guard.take();
29 }
30}
31
32impl Handle {
33 pub fn enter<F, R>(&self, f: F) -> R
37 where
38 F: FnOnce() -> R,
39 {
40 let _enter = self.inner.enter();
41 f()
42 }
43
44 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
48 where
49 F: Future + Send + 'static,
50 F::Output: Send + 'static,
51 {
52 let tokio_task_guard = self.guard.clone();
53
54 self.inner.spawn(async move {
55 let _guard = tokio_task_guard;
58 future.await
59 })
60 }
61
62 pub fn block_on<F: Future>(&self, future: F) -> F::Output {
64 self.inner.block_on(future)
65 }
66
67 pub fn spawn_blocking<F, R>(&self, f: F) -> JoinHandle<R>
71 where
72 F: FnOnce() -> R + Send + 'static,
73 R: Send + 'static,
74 {
75 self.inner.spawn_blocking(f)
76 }
77
78 pub fn into_inner(self) -> TokioHandle {
80 self.inner
81 }
82}
83
84fn new_runtime(worker_num: Option<usize>) -> Runtime {
86 Builder::new_multi_thread()
87 .enable_all()
88 .worker_threads(worker_num.unwrap_or_else(|| available_parallelism().unwrap().into()))
89 .thread_name_fn(|| {
90 static ATOMIC_ID: AtomicU32 = AtomicU32::new(0);
91 let id = ATOMIC_ID
92 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |n| {
93 if n >= 999_999 {
106 Some(0)
107 } else {
108 Some(n + 1)
109 }
110 })
111 .expect("impossible since the above closure must return Some(number)");
112 format!("GlobalRt-{id}")
113 })
114 .build()
115 .expect("ckb runtime initialized")
116}
117
118pub fn new_global_runtime(worker_num: Option<usize>) -> (Handle, Receiver<()>, Runtime) {
120 let runtime = new_runtime(worker_num);
121 let handle = runtime.handle().clone();
122 let (guard, handle_stop_rx): (Sender<()>, Receiver<()>) = tokio::sync::mpsc::channel::<()>(1);
123
124 (Handle::new(handle, Some(guard)), handle_stop_rx, runtime)
125}
126
127pub fn new_background_runtime() -> Handle {
130 let runtime = new_runtime(None);
131 let handle = runtime.handle().clone();
132
133 let (guard, mut handle_stop_rx): (Sender<()>, Receiver<()>) =
134 tokio::sync::mpsc::channel::<()>(1);
135 let _thread = std::thread::Builder::new()
136 .name("GlobalRtBuilder".to_string())
137 .spawn(move || {
138 let ret = runtime.block_on(async move { handle_stop_rx.recv().await });
139 ckb_logger::debug!("Global runtime finished {:?}", ret);
140 })
141 .expect("tokio runtime started");
142
143 Handle::new(handle, Some(guard))
144}
145
146impl Spawn for Handle {
147 fn spawn_task<F>(&self, future: F)
148 where
149 F: Future<Output = ()> + Send + 'static,
150 {
151 self.spawn(future);
152 }
153}