1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use ckb_logger::debug;
use ckb_spawn::Spawn;
use ckb_stop_handler::{SignalSender, StopHandler};
use core::future::Future;
use std::{
sync::atomic::{AtomicU32, Ordering},
thread,
};
use tokio::runtime::Builder;
use tokio::runtime::Handle as TokioHandle;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
pub use tokio;
#[derive(Debug, Clone)]
pub struct Handle {
pub(crate) inner: TokioHandle,
}
impl Handle {
pub fn enter<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
{
let _enter = self.inner.enter();
f()
}
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.inner.spawn(future)
}
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
self.inner.block_on(future)
}
pub fn spawn_blocking<F, R>(&self, f: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.inner.spawn_blocking(f)
}
}
pub fn new_global_runtime() -> (Handle, StopHandler<()>) {
let runtime = Builder::new_multi_thread()
.enable_all()
.thread_name("GlobalRt")
.thread_name_fn(|| {
static ATOMIC_ID: AtomicU32 = AtomicU32::new(0);
let id = ATOMIC_ID
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |n| {
if n >= 999_999 {
Some(0)
} else {
Some(n + 1)
}
})
.expect("impossible since the above closure must return Some(number)");
format!("GlobalRt-{}", id)
})
.build()
.expect("ckb runtime initialized");
let handle = runtime.handle().clone();
let (tx, rx) = oneshot::channel();
let thread = thread::Builder::new()
.name("GlobalRtBuilder".to_string())
.spawn(move || {
let ret = runtime.block_on(rx);
debug!("global runtime finish {:?}", ret);
})
.expect("tokio runtime started");
(
Handle { inner: handle },
StopHandler::new(SignalSender::Tokio(tx), Some(thread)),
)
}
impl Spawn for Handle {
fn spawn_task<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
self.spawn(future);
}
}