ckb_async_runtime/
native.rs

1use ckb_spawn::Spawn;
2use core::future::Future;
3use std::sync::atomic::{AtomicU32, Ordering};
4use std::thread::available_parallelism;
5use tokio::runtime::{Builder, Handle as TokioHandle, Runtime};
6
7use tokio::sync::mpsc::{Receiver, Sender};
8use tokio::task::JoinHandle;
9
10// Handle is a newtype wrap and unwrap tokio::Handle, it is workaround with Rust Orphan Rules.
11// We need `Handle` impl ckb spawn trait decouple tokio dependence
12
13/// Handle to the runtime.
14#[derive(Debug, Clone)]
15pub struct Handle {
16    pub(crate) inner: TokioHandle,
17    guard: Option<Sender<()>>,
18}
19
20impl Handle {
21    /// Create a new Handle
22    pub fn new(inner: TokioHandle, guard: Option<Sender<()>>) -> Self {
23        Self { inner, guard }
24    }
25
26    /// Drop the guard
27    pub fn drop_guard(&mut self) {
28        let _ = self.guard.take();
29    }
30}
31
32impl Handle {
33    /// Enter the runtime context. This allows you to construct types that must
34    /// have an executor available on creation such as [`tokio::time::Sleep`] or [`tokio::net::TcpStream`].
35    /// It will also allow you to call methods such as [`tokio::spawn`].
36    pub fn enter<F, R>(&self, f: F) -> R
37    where
38        F: FnOnce() -> R,
39    {
40        let _enter = self.inner.enter();
41        f()
42    }
43
44    /// Spawns a future onto the runtime.
45    ///
46    /// This spawns the given future onto the runtime's executor
47    pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
48    where
49        F: Future + Send + 'static,
50        F::Output: Send + 'static,
51    {
52        let tokio_task_guard = self.guard.clone();
53
54        self.inner.spawn(async move {
55            // move tokio_task_guard into the spawned future
56            // so that it will be dropped when the future is finished
57            let _guard = tokio_task_guard;
58            future.await
59        })
60    }
61
62    /// Run a future to completion on the Tokio runtime from a synchronous context.
63    pub fn block_on<F: Future>(&self, future: F) -> F::Output {
64        self.inner.block_on(future)
65    }
66
67    /// Spawns a future onto the runtime blocking pool.
68    ///
69    /// This spawns the given future onto the runtime's blocking executor
70    pub fn spawn_blocking<F, R>(&self, f: F) -> JoinHandle<R>
71    where
72        F: FnOnce() -> R + Send + 'static,
73        R: Send + 'static,
74    {
75        self.inner.spawn_blocking(f)
76    }
77
78    /// Transform to inner tokio handler
79    pub fn into_inner(self) -> TokioHandle {
80        self.inner
81    }
82}
83
84/// Create a new runtime with unique name.
85fn new_runtime(worker_num: Option<usize>) -> Runtime {
86    Builder::new_multi_thread()
87        .enable_all()
88        .worker_threads(worker_num.unwrap_or_else(|| available_parallelism().unwrap().into()))
89        .thread_name_fn(|| {
90            static ATOMIC_ID: AtomicU32 = AtomicU32::new(0);
91            let id = ATOMIC_ID
92                .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |n| {
93                    // A long thread name will cut to 15 characters in debug tools.
94                    // Such as "top", "htop", "gdb" and so on.
95                    // It's a kernel limit.
96                    //
97                    // So if we want to see the whole name in debug tools,
98                    // this number should have 6 digits at most,
99                    // since the prefix uses 9 characters in below code.
100                    //
101                    // There still has a issue:
102                    // When id wraps around, we couldn't know whether the old id
103                    // is released or not.
104                    // But we can ignore this, because it's almost impossible.
105                    if n >= 999_999 {
106                        Some(0)
107                    } else {
108                        Some(n + 1)
109                    }
110                })
111                .expect("impossible since the above closure must return Some(number)");
112            format!("GlobalRt-{id}")
113        })
114        .build()
115        .expect("ckb runtime initialized")
116}
117
118/// Create new threaded_scheduler tokio Runtime, return `Runtime`
119pub fn new_global_runtime(worker_num: Option<usize>) -> (Handle, Receiver<()>, Runtime) {
120    let runtime = new_runtime(worker_num);
121    let handle = runtime.handle().clone();
122    let (guard, handle_stop_rx): (Sender<()>, Receiver<()>) = tokio::sync::mpsc::channel::<()>(1);
123
124    (Handle::new(handle, Some(guard)), handle_stop_rx, runtime)
125}
126
127/// Create new threaded_scheduler tokio Runtime, return `Handle` and background thread join handle,
128/// NOTICE: This is only used in testing
129pub fn new_background_runtime() -> Handle {
130    let runtime = new_runtime(None);
131    let handle = runtime.handle().clone();
132
133    let (guard, mut handle_stop_rx): (Sender<()>, Receiver<()>) =
134        tokio::sync::mpsc::channel::<()>(1);
135    let _thread = std::thread::Builder::new()
136        .name("GlobalRtBuilder".to_string())
137        .spawn(move || {
138            let ret = runtime.block_on(async move { handle_stop_rx.recv().await });
139            ckb_logger::debug!("Global runtime finished {:?}", ret);
140        })
141        .expect("tokio runtime started");
142
143    Handle::new(handle, Some(guard))
144}
145
146impl Spawn for Handle {
147    fn spawn_task<F>(&self, future: F)
148    where
149        F: Future<Output = ()> + Send + 'static,
150    {
151        self.spawn(future);
152    }
153}