tokio_compat/runtime/threadpool/
builder.rs

1use super::{compat, Inner, Runtime};
2
3use std::{
4    io,
5    sync::{Arc, RwLock},
6};
7use tokio_02::runtime;
8use tokio_timer_02::clock as clock_02;
9
10/// Builds a compatibility runtime with custom configuration values.
11///
12/// This runtime is compatible with code using both the current release version
13/// of `tokio` (0.1) and with legacy code using `tokio` 0.1.
14///
15/// Methods can be chained in order to set the configuration values. The
16/// Runtime is constructed by calling [`build`].
17///
18/// New instances of `Builder` are obtained via [`Builder::new`].
19///
20/// See function level documentation for details on the various configuration
21/// settings.
22///
23/// [`build`]: #method.build
24/// [`Builder::new`]: #method.new
25///
26/// # Examples
27///
28/// ```
29///
30/// use tokio_compat::runtime::Builder;
31/// use tokio_timer_02::clock::Clock;
32///
33/// fn main() {
34///     // build Runtime
35///     let runtime = Builder::new()
36///         .clock(Clock::system())
37///         .core_threads(4)
38///         .name_prefix("my-custom-name-")
39///         .stack_size(3 * 1024 * 1024)
40///         .build()
41///         .unwrap();
42///
43///     // use runtime ...
44/// }
45/// ```
46#[derive(Debug)]
47#[cfg_attr(docsrs, doc(cfg(feature = "rt-full")))]
48pub struct Builder {
49    inner: runtime::Builder,
50    clock: clock_02::Clock,
51}
52
53impl Builder {
54    /// Returns a new runtime builder initialized with default configuration
55    /// values.
56    ///
57    /// Configuration methods can be chained on the return value.
58    pub fn new() -> Builder {
59        Builder {
60            clock: clock_02::Clock::system(),
61            inner: runtime::Builder::new(),
62        }
63    }
64
65    /// Set the `Clock` instance that will be used by the runtime's legacy timer.
66    pub fn clock(&mut self, clock: clock_02::Clock) -> &mut Self {
67        self.clock = clock;
68        self
69    }
70
71    /// Set the maximum number of worker threads for the `Runtime`'s thread pool.
72    ///
73    /// This must be a number between 1 and 32,768 though it is advised to keep
74    /// this value on the smaller side.
75    ///
76    /// The default value is the number of cores available to the system.
77    ///
78    /// # Examples
79    ///
80    /// ```
81    /// # use tokio_compat::runtime;
82    ///
83    /// # pub fn main() {
84    /// let rt = runtime::Builder::new()
85    ///     .core_threads(4)
86    ///     .build()
87    ///     .unwrap();
88    /// # }
89    /// ```
90    pub fn core_threads(&mut self, val: usize) -> &mut Self {
91        // the deprecation warning states that this method will be replaced, but
92        // the method that will replace it doesn't exist yet...
93        #[allow(deprecated)]
94        self.inner.num_threads(val);
95        self
96    }
97
98    /// Set name prefix of threads spawned by the `Runtime`'s thread pool.
99    ///
100    /// Thread name prefix is used for generating thread names. For example, if
101    /// prefix is `my-pool-`, then threads in the pool will get names like
102    /// `my-pool-1` etc.
103    ///
104    /// The default prefix is "tokio-runtime-worker-".
105    ///
106    /// # Examples
107    ///
108    /// ```
109    /// # use tokio_compat::runtime;
110    ///
111    /// # pub fn main() {
112    /// let rt = runtime::Builder::new()
113    ///     .name_prefix("my-pool-")
114    ///     .build();
115    /// # }
116    /// ```
117    pub fn name_prefix<S: Into<String>>(&mut self, val: S) -> &mut Self {
118        self.inner.thread_name(val);
119        self
120    }
121
122    /// Set the stack size (in bytes) for worker threads.
123    ///
124    /// The actual stack size may be greater than this value if the platform
125    /// specifies minimal stack size.
126    ///
127    /// The default stack size for spawned threads is 2 MiB, though this
128    /// particular stack size is subject to change in the future.
129    ///
130    /// # Examples
131    ///
132    /// ```
133    /// # use tokio_compat::runtime;
134    ///
135    /// # pub fn main() {
136    /// let rt = runtime::Builder::new()
137    ///     .stack_size(32 * 1024)
138    ///     .build();
139    /// # }
140    /// ```
141    pub fn stack_size(&mut self, val: usize) -> &mut Self {
142        self.inner.thread_stack_size(val);
143        self
144    }
145
146    /// Create the configured `Runtime`.
147    ///
148    /// The returned `ThreadPool` instance is ready to spawn tasks.
149    ///
150    /// # Examples
151    ///
152    /// ```
153    /// # use tokio_compat::runtime::Builder;
154    /// # pub fn main() {
155    /// let runtime = Builder::new().build().unwrap();
156    /// // ... call runtime.run(...)
157    /// # let _ = runtime;
158    /// # }
159    /// ```
160    pub fn build(&mut self) -> io::Result<Runtime> {
161        let compat_bg = compat::Background::spawn(&self.clock)?;
162        let compat_timer = compat_bg.timer().clone();
163        let compat_reactor = compat_bg.reactor().clone();
164        let compat_sender: Arc<RwLock<Option<super::CompatSpawner<tokio_02::runtime::Handle>>>> =
165            Arc::new(RwLock::new(None));
166
167        // We need a weak ref here so that when we pass this into the
168        // `on_thread_start` closure it's stored as a _weak_ ref and not
169        // a strong one. This is important because tokio 0.2's runtime
170        // should shut down when the compat Runtime has been dropped. There
171        // can be a race condition where we hold onto an extra Arc, which holds
172        // a runtime handle beyond the drop. This causes the tokio 0.2 runtime to
173        // not shut down and leak fds.
174        //
175        // Tokio 0.2's threaded_scheduler will spawn threads that each contain an arced
176        // ref to the `on_thread_start` fn. If the runtime shuts down but there is still
177        // access to a runtime handle, the mio driver will not shutdown. To avoid this we
178        // only want the `on_thread_start` to hold a weak ref and attempt to upgrade, since
179        // the runtime will still be acitve the upgrade should work. Otherwise, somehow
180        // tokio started a new thread after its runtime has been dropped.
181        let compat_sender2 = Arc::downgrade(&compat_sender);
182        let mut lock = compat_sender.write().unwrap();
183
184        let runtime = self
185            .inner
186            .threaded_scheduler()
187            .enable_all()
188            .on_thread_start(move || {
189                // We need the threadpool's sender to set up the default tokio
190                // 0.1 executor. We also need to upgrade the weak pointer. If the
191                // pointer is no longer valid, then the runtime has shut down and the
192                // handle is no longer available.
193                //
194                // This upgrade will only fail if the compat runtime has been dropped.
195                let sender = compat_sender2
196                    .upgrade()
197                    .expect("Runtime dropped but thread started; this is a bug!");
198                let lock = sender.read().unwrap();
199                let compat_sender = lock
200                    .as_ref()
201                    .expect("compat executor needs to be set before the pool is run!")
202                    .clone();
203                compat::set_guards(compat_sender, &compat_timer, &compat_reactor);
204            })
205            .on_thread_stop(|| {
206                compat::unset_guards();
207            })
208            .build()?;
209
210        let (idle, idle_rx) = super::idle::Idle::new();
211        // Set the tokio 0.1 executor to be used by the worker threads.
212        *lock = Some(super::CompatSpawner::new(runtime.handle().clone(), &idle));
213        drop(lock);
214        let runtime = Runtime {
215            inner: Some(Inner { runtime, compat_bg }),
216            idle_rx,
217            idle,
218            compat_sender,
219        };
220
221        Ok(runtime)
222    }
223}
224
225impl Default for Builder {
226    fn default() -> Self {
227        Self::new()
228    }
229}