tokio_compat/runtime/threadpool/builder.rs
1use super::{compat, Inner, Runtime};
2
3use std::{
4 io,
5 sync::{Arc, RwLock},
6};
7use tokio_02::runtime;
8use tokio_timer_02::clock as clock_02;
9
10/// Builds a compatibility runtime with custom configuration values.
11///
12/// This runtime is compatible with code using both the current release version
13/// of `tokio` (0.1) and with legacy code using `tokio` 0.1.
14///
15/// Methods can be chained in order to set the configuration values. The
16/// Runtime is constructed by calling [`build`].
17///
18/// New instances of `Builder` are obtained via [`Builder::new`].
19///
20/// See function level documentation for details on the various configuration
21/// settings.
22///
23/// [`build`]: #method.build
24/// [`Builder::new`]: #method.new
25///
26/// # Examples
27///
28/// ```
29///
30/// use tokio_compat::runtime::Builder;
31/// use tokio_timer_02::clock::Clock;
32///
33/// fn main() {
34/// // build Runtime
35/// let runtime = Builder::new()
36/// .clock(Clock::system())
37/// .core_threads(4)
38/// .name_prefix("my-custom-name-")
39/// .stack_size(3 * 1024 * 1024)
40/// .build()
41/// .unwrap();
42///
43/// // use runtime ...
44/// }
45/// ```
46#[derive(Debug)]
47#[cfg_attr(docsrs, doc(cfg(feature = "rt-full")))]
48pub struct Builder {
49 inner: runtime::Builder,
50 clock: clock_02::Clock,
51}
52
53impl Builder {
54 /// Returns a new runtime builder initialized with default configuration
55 /// values.
56 ///
57 /// Configuration methods can be chained on the return value.
58 pub fn new() -> Builder {
59 Builder {
60 clock: clock_02::Clock::system(),
61 inner: runtime::Builder::new(),
62 }
63 }
64
65 /// Set the `Clock` instance that will be used by the runtime's legacy timer.
66 pub fn clock(&mut self, clock: clock_02::Clock) -> &mut Self {
67 self.clock = clock;
68 self
69 }
70
71 /// Set the maximum number of worker threads for the `Runtime`'s thread pool.
72 ///
73 /// This must be a number between 1 and 32,768 though it is advised to keep
74 /// this value on the smaller side.
75 ///
76 /// The default value is the number of cores available to the system.
77 ///
78 /// # Examples
79 ///
80 /// ```
81 /// # use tokio_compat::runtime;
82 ///
83 /// # pub fn main() {
84 /// let rt = runtime::Builder::new()
85 /// .core_threads(4)
86 /// .build()
87 /// .unwrap();
88 /// # }
89 /// ```
90 pub fn core_threads(&mut self, val: usize) -> &mut Self {
91 // the deprecation warning states that this method will be replaced, but
92 // the method that will replace it doesn't exist yet...
93 #[allow(deprecated)]
94 self.inner.num_threads(val);
95 self
96 }
97
98 /// Set name prefix of threads spawned by the `Runtime`'s thread pool.
99 ///
100 /// Thread name prefix is used for generating thread names. For example, if
101 /// prefix is `my-pool-`, then threads in the pool will get names like
102 /// `my-pool-1` etc.
103 ///
104 /// The default prefix is "tokio-runtime-worker-".
105 ///
106 /// # Examples
107 ///
108 /// ```
109 /// # use tokio_compat::runtime;
110 ///
111 /// # pub fn main() {
112 /// let rt = runtime::Builder::new()
113 /// .name_prefix("my-pool-")
114 /// .build();
115 /// # }
116 /// ```
117 pub fn name_prefix<S: Into<String>>(&mut self, val: S) -> &mut Self {
118 self.inner.thread_name(val);
119 self
120 }
121
122 /// Set the stack size (in bytes) for worker threads.
123 ///
124 /// The actual stack size may be greater than this value if the platform
125 /// specifies minimal stack size.
126 ///
127 /// The default stack size for spawned threads is 2 MiB, though this
128 /// particular stack size is subject to change in the future.
129 ///
130 /// # Examples
131 ///
132 /// ```
133 /// # use tokio_compat::runtime;
134 ///
135 /// # pub fn main() {
136 /// let rt = runtime::Builder::new()
137 /// .stack_size(32 * 1024)
138 /// .build();
139 /// # }
140 /// ```
141 pub fn stack_size(&mut self, val: usize) -> &mut Self {
142 self.inner.thread_stack_size(val);
143 self
144 }
145
146 /// Create the configured `Runtime`.
147 ///
148 /// The returned `ThreadPool` instance is ready to spawn tasks.
149 ///
150 /// # Examples
151 ///
152 /// ```
153 /// # use tokio_compat::runtime::Builder;
154 /// # pub fn main() {
155 /// let runtime = Builder::new().build().unwrap();
156 /// // ... call runtime.run(...)
157 /// # let _ = runtime;
158 /// # }
159 /// ```
160 pub fn build(&mut self) -> io::Result<Runtime> {
161 let compat_bg = compat::Background::spawn(&self.clock)?;
162 let compat_timer = compat_bg.timer().clone();
163 let compat_reactor = compat_bg.reactor().clone();
164 let compat_sender: Arc<RwLock<Option<super::CompatSpawner<tokio_02::runtime::Handle>>>> =
165 Arc::new(RwLock::new(None));
166
167 // We need a weak ref here so that when we pass this into the
168 // `on_thread_start` closure it's stored as a _weak_ ref and not
169 // a strong one. This is important because tokio 0.2's runtime
170 // should shut down when the compat Runtime has been dropped. There
171 // can be a race condition where we hold onto an extra Arc, which holds
172 // a runtime handle beyond the drop. This causes the tokio 0.2 runtime to
173 // not shut down and leak fds.
174 //
175 // Tokio 0.2's threaded_scheduler will spawn threads that each contain an arced
176 // ref to the `on_thread_start` fn. If the runtime shuts down but there is still
177 // access to a runtime handle, the mio driver will not shutdown. To avoid this we
178 // only want the `on_thread_start` to hold a weak ref and attempt to upgrade, since
179 // the runtime will still be acitve the upgrade should work. Otherwise, somehow
180 // tokio started a new thread after its runtime has been dropped.
181 let compat_sender2 = Arc::downgrade(&compat_sender);
182 let mut lock = compat_sender.write().unwrap();
183
184 let runtime = self
185 .inner
186 .threaded_scheduler()
187 .enable_all()
188 .on_thread_start(move || {
189 // We need the threadpool's sender to set up the default tokio
190 // 0.1 executor. We also need to upgrade the weak pointer. If the
191 // pointer is no longer valid, then the runtime has shut down and the
192 // handle is no longer available.
193 //
194 // This upgrade will only fail if the compat runtime has been dropped.
195 let sender = compat_sender2
196 .upgrade()
197 .expect("Runtime dropped but thread started; this is a bug!");
198 let lock = sender.read().unwrap();
199 let compat_sender = lock
200 .as_ref()
201 .expect("compat executor needs to be set before the pool is run!")
202 .clone();
203 compat::set_guards(compat_sender, &compat_timer, &compat_reactor);
204 })
205 .on_thread_stop(|| {
206 compat::unset_guards();
207 })
208 .build()?;
209
210 let (idle, idle_rx) = super::idle::Idle::new();
211 // Set the tokio 0.1 executor to be used by the worker threads.
212 *lock = Some(super::CompatSpawner::new(runtime.handle().clone(), &idle));
213 drop(lock);
214 let runtime = Runtime {
215 inner: Some(Inner { runtime, compat_bg }),
216 idle_rx,
217 idle,
218 compat_sender,
219 };
220
221 Ok(runtime)
222 }
223}
224
225impl Default for Builder {
226 fn default() -> Self {
227 Self::new()
228 }
229}