tetsy_jsonrpc_server_utils/
reactor.rs1use std::io;
9use tokio;
10
11use crate::core::futures::{self, Future};
12
13#[derive(Debug)]
15pub enum UninitializedExecutor {
16 Shared(tokio::runtime::TaskExecutor),
18 Unspawned,
20}
21
22impl UninitializedExecutor {
23 pub fn initialize(self) -> io::Result<Executor> {
27 self.init_with_name("event.loop")
28 }
29
30 pub fn init_with_name<T: Into<String>>(self, name: T) -> io::Result<Executor> {
34 match self {
35 UninitializedExecutor::Shared(executor) => Ok(Executor::Shared(executor)),
36 UninitializedExecutor::Unspawned => RpcEventLoop::with_name(Some(name.into())).map(Executor::Spawned),
37 }
38 }
39}
40
41#[derive(Debug)]
43pub enum Executor {
44 Shared(tokio::runtime::TaskExecutor),
46 Spawned(RpcEventLoop),
48}
49
50impl Executor {
51 pub fn executor(&self) -> tokio::runtime::TaskExecutor {
53 match *self {
54 Executor::Shared(ref executor) => executor.clone(),
55 Executor::Spawned(ref eloop) => eloop.executor(),
56 }
57 }
58
59 pub fn spawn<F>(&self, future: F)
61 where
62 F: Future<Item = (), Error = ()> + Send + 'static,
63 {
64 self.executor().spawn(future)
65 }
66
67 pub fn close(self) {
69 if let Executor::Spawned(eloop) = self {
70 eloop.close()
71 }
72 }
73
74 pub fn wait(self) {
76 if let Executor::Spawned(eloop) = self {
77 let _ = eloop.wait();
78 }
79 }
80}
81
82#[derive(Debug)]
84pub struct RpcEventLoop {
85 executor: tokio::runtime::TaskExecutor,
86 close: Option<futures::Complete<()>>,
87 handle: Option<tokio::runtime::Shutdown>,
88}
89
90impl Drop for RpcEventLoop {
91 fn drop(&mut self) {
92 self.close.take().map(|v| v.send(()));
93 }
94}
95
96impl RpcEventLoop {
97 pub fn spawn() -> io::Result<Self> {
99 RpcEventLoop::with_name(None)
100 }
101
102 pub fn with_name(name: Option<String>) -> io::Result<Self> {
104 let (stop, stopped) = futures::oneshot();
105
106 let mut tb = tokio::runtime::Builder::new();
107 tb.core_threads(1);
108
109 if let Some(name) = name {
110 tb.name_prefix(name);
111 }
112
113 let mut runtime = tb.build()?;
114 let executor = runtime.executor();
115 let terminate = futures::empty().select(stopped).map(|_| ()).map_err(|_| ());
116 runtime.spawn(terminate);
117 let handle = runtime.shutdown_on_idle();
118
119 Ok(RpcEventLoop {
120 executor,
121 close: Some(stop),
122 handle: Some(handle),
123 })
124 }
125
126 pub fn executor(&self) -> tokio::runtime::TaskExecutor {
128 self.executor.clone()
129 }
130
131 pub fn wait(mut self) -> Result<(), ()> {
133 self.handle.take().ok_or(())?.wait()
134 }
135
136 pub fn close(mut self) {
138 let _ = self
139 .close
140 .take()
141 .expect("Close is always set before self is consumed.")
142 .send(())
143 .map_err(|e| {
144 warn!("Event Loop is already finished. {:?}", e);
145 });
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152
153 #[test]
154 fn make_sure_rpc_event_loop_is_send_and_sync() {
155 fn is_send_and_sync<T: Send + Sync>() {}
156
157 is_send_and_sync::<RpcEventLoop>();
158 }
159}