susydev_jsonrpc_server_utils/
reactor.rs1use num_cpus;
5use std::sync::mpsc;
6use std::{io, thread};
7use tokio;
8
9use crate::core::futures::{self, Future};
10
11#[derive(Debug)]
13pub enum UninitializedExecutor {
14 Shared(tokio::runtime::TaskExecutor),
16 Unspawned,
18}
19
20impl UninitializedExecutor {
21 pub fn initialize(self) -> io::Result<Executor> {
25 self.init_with_name("event.loop")
26 }
27
28 pub fn init_with_name<T: Into<String>>(self, name: T) -> io::Result<Executor> {
32 match self {
33 UninitializedExecutor::Shared(executor) => Ok(Executor::Shared(executor)),
34 UninitializedExecutor::Unspawned => RpcEventLoop::with_name(Some(name.into())).map(Executor::Spawned),
35 }
36 }
37}
38
39#[derive(Debug)]
41pub enum Executor {
42 Shared(tokio::runtime::TaskExecutor),
44 Spawned(RpcEventLoop),
46}
47
48impl Executor {
49 pub fn executor(&self) -> tokio::runtime::TaskExecutor {
51 match *self {
52 Executor::Shared(ref executor) => executor.clone(),
53 Executor::Spawned(ref eloop) => eloop.executor(),
54 }
55 }
56
57 pub fn spawn<F>(&self, future: F)
59 where
60 F: Future<Item = (), Error = ()> + Send + 'static,
61 {
62 self.executor().spawn(future)
63 }
64
65 pub fn close(self) {
67 if let Executor::Spawned(eloop) = self {
68 eloop.close()
69 }
70 }
71
72 pub fn wait(self) {
74 if let Executor::Spawned(eloop) = self {
75 let _ = eloop.wait();
76 }
77 }
78}
79
80#[derive(Debug)]
82pub struct RpcEventLoop {
83 executor: tokio::runtime::TaskExecutor,
84 close: Option<futures::Complete<()>>,
85 handle: Option<thread::JoinHandle<()>>,
86}
87
88impl Drop for RpcEventLoop {
89 fn drop(&mut self) {
90 self.close.take().map(|v| v.send(()));
91 }
92}
93
94impl RpcEventLoop {
95 pub fn spawn() -> io::Result<Self> {
97 RpcEventLoop::with_name(None)
98 }
99
100 pub fn with_name(name: Option<String>) -> io::Result<Self> {
102 let (stop, stopped) = futures::oneshot();
103 let (tx, rx) = mpsc::channel();
104 let mut tb = thread::Builder::new();
105 if let Some(name) = name {
106 tb = tb.name(name);
107 }
108
109 let handle = tb
110 .spawn(move || {
111 let core_threads = match num_cpus::get_physical() {
112 1 => 1,
113 2..=4 => 2,
114 _ => 3,
115 };
116
117 let runtime = tokio::runtime::Builder::new()
118 .core_threads(core_threads)
119 .name_prefix("susydev-jsonrpc-eventloop-")
120 .build();
121
122 match runtime {
123 Ok(mut runtime) => {
124 tx.send(Ok(runtime.executor())).expect("Rx is blocking upper thread.");
125 let terminate = futures::empty().select(stopped).map(|_| ()).map_err(|_| ());
126 runtime.spawn(terminate);
127 runtime.shutdown_on_idle().wait().unwrap();
128 }
129 Err(err) => {
130 tx.send(Err(err)).expect("Rx is blocking upper thread.");
131 }
132 }
133 })
134 .expect("Couldn't spawn a thread.");
135
136 let exec = rx.recv().expect("tx is transfered to a newly spawned thread.");
137
138 exec.map(|executor| RpcEventLoop {
139 executor,
140 close: Some(stop),
141 handle: Some(handle),
142 })
143 }
144
145 pub fn executor(&self) -> tokio::runtime::TaskExecutor {
147 self.executor.clone()
148 }
149
150 pub fn wait(mut self) -> thread::Result<()> {
152 self.handle
153 .take()
154 .expect("Handle is always set before self is consumed.")
155 .join()
156 }
157
158 pub fn close(mut self) {
160 let _ = self
161 .close
162 .take()
163 .expect("Close is always set before self is consumed.")
164 .send(())
165 .map_err(|e| {
166 warn!("Event Loop is already finished. {:?}", e);
167 });
168 }
169}