1use futures::compat::*;
20use futures01::{Future as Future01, IntoFuture as IntoFuture01};
21use std::{fmt, future::Future, thread};
22pub use tokio_compat::runtime::{Builder as TokioRuntimeBuilder, Runtime as TokioRuntime, TaskExecutor};
23
24pub struct Runtime {
28 executor: Executor,
29 handle: RuntimeHandle,
30}
31
32const RUNTIME_BUILD_PROOF: &str =
33 "Building a Tokio runtime will only fail when mio components cannot be initialized (catastrophic)";
34
35impl Runtime {
36 fn new(runtime_bldr: &mut TokioRuntimeBuilder) -> Self {
37 let mut runtime = runtime_bldr.build().expect(RUNTIME_BUILD_PROOF);
38
39 let (stop, stopped) = tokio::sync::oneshot::channel();
40 let (tx, rx) = std::sync::mpsc::channel();
41 let handle = thread::spawn(move || {
42 let executor = runtime.executor();
43 runtime.block_on_std(async move {
44 tx.send(executor).expect("Rx is blocking upper thread.");
45 let _ = stopped.await;
46 });
47 });
48 let executor = rx.recv().expect("tx is transfered to a newly spawned thread.");
49
50 Runtime {
51 executor: Executor { inner: Mode::Tokio(executor) },
52 handle: RuntimeHandle { close: Some(stop), handle: Some(handle) },
53 }
54 }
55
56 pub fn with_default_thread_count() -> Self {
60 let mut runtime_bldr = TokioRuntimeBuilder::new();
61 Self::new(&mut runtime_bldr)
62 }
63
64 #[cfg(any(test, feature = "test-helpers"))]
68 pub fn with_thread_count(thread_count: usize) -> Self {
69 let mut runtime_bldr = TokioRuntimeBuilder::new();
70 runtime_bldr.core_threads(thread_count);
71
72 Self::new(&mut runtime_bldr)
73 }
74
75 #[cfg(any(test, feature = "test-helpers"))]
77 pub fn raw_executor(&self) -> TaskExecutor {
78 if let Mode::Tokio(ref executor) = self.executor.inner {
79 executor.clone()
80 } else {
81 panic!("Runtime is not initialized in Tokio mode.")
82 }
83 }
84
85 pub fn executor(&self) -> Executor {
87 self.executor.clone()
88 }
89}
90
91#[derive(Clone)]
92enum Mode {
93 Tokio(TaskExecutor),
94 #[allow(dead_code)]
96 Sync,
97 #[allow(dead_code)]
99 ThreadPerFuture,
100}
101
102impl fmt::Debug for Mode {
103 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
104 use self::Mode::*;
105
106 match *self {
107 Tokio(_) => write!(fmt, "tokio"),
108 Sync => write!(fmt, "synchronous"),
109 ThreadPerFuture => write!(fmt, "thread per future"),
110 }
111 }
112}
113
114fn block_on<F: Future<Output = ()> + Send + 'static>(r: F) {
115 tokio::runtime::Builder::new().enable_all().basic_scheduler().build().expect(RUNTIME_BUILD_PROOF).block_on(r)
116}
117
118#[derive(Debug, Clone)]
119pub struct Executor {
120 inner: Mode,
121}
122
123impl Executor {
124 #[cfg(any(test, feature = "test-helpers"))]
126 pub fn new_sync() -> Self {
127 Executor { inner: Mode::Sync }
128 }
129
130 #[cfg(any(test, feature = "test-helpers"))]
132 pub fn new_thread_per_future() -> Self {
133 Executor { inner: Mode::ThreadPerFuture }
134 }
135
136 pub fn spawn<R>(&self, r: R)
138 where
139 R: IntoFuture01<Item = (), Error = ()> + Send + 'static,
140 R::Future: Send + 'static,
141 {
142 self.spawn_std(async move {
143 let _ = r.into_future().compat().await;
144 })
145 }
146
147 pub fn spawn_std<R>(&self, r: R)
149 where
150 R: Future<Output = ()> + Send + 'static,
151 {
152 match &self.inner {
153 Mode::Tokio(executor) => {
154 let _ = executor.spawn_handle_std(r);
155 }
156 Mode::Sync => block_on(r),
157 Mode::ThreadPerFuture => {
158 thread::spawn(move || block_on(r));
159 }
160 }
161 }
162}
163
164impl<F: Future01<Item = (), Error = ()> + Send + 'static> futures01::future::Executor<F> for Executor {
165 fn execute(&self, future: F) -> Result<(), futures01::future::ExecuteError<F>> {
166 match &self.inner {
167 Mode::Tokio(executor) => executor.execute(future),
168 Mode::Sync => {
169 block_on(async move {
170 let _ = future.compat().await;
171 });
172 Ok(())
173 }
174 Mode::ThreadPerFuture => {
175 thread::spawn(move || {
176 block_on(async move {
177 let _ = future.compat().await;
178 })
179 });
180 Ok(())
181 }
182 }
183 }
184}
185
186pub struct RuntimeHandle {
188 close: Option<tokio::sync::oneshot::Sender<()>>,
189 handle: Option<thread::JoinHandle<()>>,
190}
191
192impl From<Runtime> for RuntimeHandle {
193 fn from(el: Runtime) -> Self {
194 el.handle
195 }
196}
197
198impl Drop for RuntimeHandle {
199 fn drop(&mut self) {
200 self.close.take().map(|v| v.send(()));
201 }
202}
203
204impl RuntimeHandle {
205 pub fn wait(mut self) -> thread::Result<()> {
207 self.handle.take().expect("Handle is taken only in `wait`, `wait` is consuming; qed").join()
208 }
209
210 pub fn close(mut self) {
212 let _ =
213 self.close.take().expect("Close is taken only in `close` and `drop`. `close` is consuming; qed").send(());
214 }
215}