bitconch_jsonrpc_server_utils/
reactor.rs1use std::{io, thread};
5use std::sync::mpsc;
6use tokio;
7use num_cpus;
8
9use core::futures::{self, Future};
10
11#[derive(Debug)]
13pub enum UninitializedExecutor {
14 Shared(tokio::runtime::TaskExecutor),
16 Unspawned,
18}
19
20impl UninitializedExecutor {
21 pub fn initialize(self) -> io::Result<Executor> {
25 self.init_with_name("event.loop")
26 }
27
28 pub fn init_with_name<T: Into<String>>(self, name: T) -> io::Result<Executor> {
32 match self {
33 UninitializedExecutor::Shared(executor) => Ok(Executor::Shared(executor)),
34 UninitializedExecutor::Unspawned => RpcEventLoop::with_name(Some(name.into())).map(Executor::Spawned),
35 }
36 }
37}
38
39#[derive(Debug)]
41pub enum Executor {
42 Shared(tokio::runtime::TaskExecutor),
44 Spawned(RpcEventLoop),
46}
47
48impl Executor {
49 pub fn executor(&self) -> tokio::runtime::TaskExecutor {
51 match *self {
52 Executor::Shared(ref executor) => executor.clone(),
53 Executor::Spawned(ref eloop) => eloop.executor(),
54 }
55 }
56
57 pub fn spawn<F>(&self, future: F)
59 where
60 F: Future<Item = (), Error = ()> + Send + 'static,
61 {
62 self.executor().spawn(future)
63 }
64
65 pub fn close(self) {
67 if let Executor::Spawned(eloop) = self {
68 eloop.close()
69 }
70 }
71
72 pub fn wait(self) {
74 if let Executor::Spawned(eloop) = self {
75 let _ = eloop.wait();
76 }
77 }
78}
79
80#[derive(Debug)]
82pub struct RpcEventLoop {
83 executor: tokio::runtime::TaskExecutor,
84 close: Option<futures::Complete<()>>,
85 handle: Option<thread::JoinHandle<()>>,
86}
87
88impl Drop for RpcEventLoop {
89 fn drop(&mut self) {
90 self.close.take().map(|v| v.send(()));
91 }
92}
93
94impl RpcEventLoop {
95 pub fn spawn() -> io::Result<Self> {
97 RpcEventLoop::with_name(None)
98 }
99
100 pub fn with_name(name: Option<String>) -> io::Result<Self> {
102 let (stop, stopped) = futures::oneshot();
103 let (tx, rx) = mpsc::channel();
104 let mut tb = thread::Builder::new();
105 if let Some(name) = name {
106 tb = tb.name(name);
107 }
108
109 let handle = tb.spawn(move || {
110 let mut tp_builder = tokio::executor::thread_pool::Builder::new();
111
112 let pool_size = match num_cpus::get_physical() {
113 1 => 1,
114 2...4 => 2,
115 _ => 3,
116 };
117
118 tp_builder
119 .pool_size(pool_size)
120 .name_prefix("jsonrpc-eventloop-");
121
122 let runtime = tokio::runtime::Builder::new()
123 .threadpool_builder(tp_builder)
124 .build();
125
126 match runtime {
127 Ok(mut runtime) => {
128 tx.send(Ok(runtime.executor())).expect("Rx is blocking upper thread.");
129 let terminate = futures::empty().select(stopped)
130 .map(|_| ())
131 .map_err(|_| ());
132 runtime.spawn(terminate);
133 runtime.shutdown_on_idle().wait().unwrap();
134 },
135 Err(err) => {
136 tx.send(Err(err)).expect("Rx is blocking upper thread.");
137 }
138 }
139 }).expect("Couldn't spawn a thread.");
140
141 let exec = rx.recv().expect("tx is transfered to a newly spawned thread.");
142
143 exec.map(|executor| RpcEventLoop {
144 executor,
145 close: Some(stop),
146 handle: Some(handle),
147 })
148 }
149
150 pub fn executor(&self) -> tokio::runtime::TaskExecutor {
152 self.executor.clone()
153 }
154
155 pub fn wait(mut self) -> thread::Result<()> {
157 self.handle.take().expect("Handle is always set before self is consumed.").join()
158 }
159
160 pub fn close(mut self) {
162 let _ = self.close.take().expect("Close is always set before self is consumed.").send(()).map_err(|e| {
163 warn!("Event Loop is already finished. {:?}", e);
164 });
165 }
166}