rs_jsonrpc_server_utils/
reactor.rs1use std::{io, thread};
5use std::sync::mpsc;
6use tokio_core;
7
8use core::futures::{self, Future};
9
10#[derive(Debug)]
12pub enum UninitializedRemote {
13 Shared(tokio_core::reactor::Remote),
15 Unspawned,
17}
18
19impl UninitializedRemote {
20 pub fn initialize(self) -> io::Result<Remote> {
24 self.init_with_name("event.loop")
25 }
26
27 pub fn init_with_name<T: Into<String>>(self, name: T) -> io::Result<Remote> {
31 match self {
32 UninitializedRemote::Shared(remote) => Ok(Remote::Shared(remote)),
33 UninitializedRemote::Unspawned => RpcEventLoop::with_name(Some(name.into())).map(Remote::Spawned),
34 }
35 }
36}
37
38#[derive(Debug)]
40pub enum Remote {
41 Shared(tokio_core::reactor::Remote),
43 Spawned(RpcEventLoop),
45}
46
47impl Remote {
48 pub fn remote(&self) -> tokio_core::reactor::Remote {
50 match *self {
51 Remote::Shared(ref remote) => remote.clone(),
52 Remote::Spawned(ref eloop) => eloop.remote(),
53 }
54 }
55
56 pub fn close(self) {
58 if let Remote::Spawned(eloop) = self {
59 eloop.close()
60 }
61 }
62
63 pub fn wait(self) {
65 if let Remote::Spawned(eloop) = self {
66 let _ = eloop.wait();
67 }
68 }
69}
70
71#[derive(Debug)]
73pub struct RpcEventLoop {
74 remote: tokio_core::reactor::Remote,
75 close: Option<futures::Complete<()>>,
76 handle: Option<thread::JoinHandle<()>>,
77}
78
79impl Drop for RpcEventLoop {
80 fn drop(&mut self) {
81 self.close.take().map(|v| v.send(()));
82 }
83}
84
85impl RpcEventLoop {
86 pub fn spawn() -> io::Result<Self> {
88 RpcEventLoop::with_name(None)
89 }
90
91 pub fn with_name(name: Option<String>) -> io::Result<Self> {
93 let (stop, stopped) = futures::oneshot();
94 let (tx, rx) = mpsc::channel();
95 let mut tb = thread::Builder::new();
96 if let Some(name) = name {
97 tb = tb.name(name);
98 }
99 let handle = tb.spawn(move || {
100 let el = tokio_core::reactor::Core::new();
101 match el {
102 Ok(mut el) => {
103 tx.send(Ok(el.remote())).expect("Rx is blocking upper thread.");
104 let _ = el.run(futures::empty().select(stopped));
105 },
106 Err(err) => {
107 tx.send(Err(err)).expect("Rx is blocking upper thread.");
108 }
109 }
110 }).expect("Couldn't spawn a thread.");
111 let remote = rx.recv().expect("tx is transfered to a newly spawned thread.");
112
113 remote.map(|remote| RpcEventLoop {
114 remote: remote,
115 close: Some(stop),
116 handle: Some(handle),
117 })
118 }
119
120 pub fn remote(&self) -> tokio_core::reactor::Remote {
122 self.remote.clone()
123 }
124
125 pub fn wait(mut self) -> thread::Result<()> {
127 self.handle.take().expect("Handle is always set before self is consumed.").join()
128 }
129
130 pub fn close(mut self) {
132 let _ = self.close.take().expect("Close is always set before self is consumed.").send(()).map_err(|e| {
133 warn!("Event Loop is already finished. {:?}", e);
134 });
135 }
136}