mio_pool/builder.rs
1use std::io;
2use std::sync::{atomic, Arc, Mutex};
3use slab::Slab;
4use std::os::unix::io::AsRawFd;
5
6use {Listener, PoolHandle, NO_EXIT};
7use worker::worker_main;
8use poll::{Poll, Token};
9
10/// Used to configure a mio pool before launching it.
11///
12/// Users will want to call `PoolBuilder::from` to start a new pool from a `Listener`, and then
13/// `PoolBuilder::run` with an `on_ready` callback to begin accepting and handling connections.
14///
15/// At a high level, the resulting pool will consist of `workers` worker threads that each accept
16/// new connections and handle incoming requests. Every time a connection has available data,
17/// `on_ready` will be called by one of the workers. A connection will stay in the pool until
18/// `on_ready` returns an error, or `Ok(true)` to indicate EOF. Unless the pool is started with
19/// `run_stateless`, `on_ready` is given mutable access to worker-local state each time it is
20/// invoked. This can be useful for maintaining caches and the like.
21///
22/// The behavior of the pool can be customized in a couple of ways, most importantly through
23/// `PoolBuilder::with_finalizer` and `PoolBuilder::and_return`. The former runs every accepted
24/// connection through a function before adding it to the connection pool. The latter allows for
25/// returning some part of the worker state after the pool has been terminated (e.g., for
26/// statistics summaries). See the relevant method documentation for more details.
27///
28/// # Examples
29///
30/// ```
31/// # extern crate mio;
32/// # extern crate mio_pool;
33/// # use mio_pool::PoolBuilder;
34/// # fn main() {
35/// let addr = "127.0.0.1:0".parse().unwrap();
36/// let server = mio::net::TcpListener::bind(&addr).unwrap();
37/// let pool = PoolBuilder::from(server).unwrap();
38/// let h = pool.run(1 /* # workers */, |c: &mut mio::net::TcpStream, s: &mut ()| {
39/// use std::io::prelude::*;
40/// let mut buf = [0u8; 1024];
41/// let n = c.read(&mut buf)?;
42/// if n == 0 {
43/// return Ok(true);
44/// }
45/// c.write_all(&buf[..n])?;
46/// Ok(false)
47/// });
48///
49/// // ...
50/// // during this period, new clients can connect
51/// // ...
52///
53/// let results = h.terminate();
54/// // results here contains the final state of each worker in the pool.
55/// // that is, the final value in each `s` passed to the closure in `run`.
56/// let result = results.into_iter().next().unwrap();
57/// assert_eq!(result.unwrap(), ());
58/// # }
59/// ```
60pub struct PoolBuilder<L, C, S, R>
61where
62 L: Listener,
63{
64 pub(super) listener: Arc<L>,
65 pub(super) epoch: Arc<atomic::AtomicUsize>,
66 pub(super) exit: Arc<atomic::AtomicUsize>,
67 pub(super) poll: Arc<Poll>,
68
69 pub(super) initial: S,
70 pub(super) adapter: Arc<Fn(L::Connection) -> C + 'static + Send + Sync>,
71 pub(super) finalizer: Arc<Fn(S) -> R + Send + Sync + 'static>,
72 thread_name_prefix: String,
73}
74
75impl<L> PoolBuilder<L, L::Connection, (), ()>
76where
77 L: Listener,
78{
79 /// Prepare a new pool from the given listener.
80 ///
81 /// The pool will monitor the listener for new connections, and distribute the task of
82 /// accepting them, and handling requests to accepted connections, among a pool of threads.
83 pub fn from(listener: L) -> io::Result<Self> {
84 let poll = Poll::new()?;
85 poll.register(&listener, Token(0))?;
86
87 Ok(PoolBuilder {
88 listener: Arc::new(listener),
89 epoch: Arc::new(atomic::AtomicUsize::new(1)),
90 poll: Arc::new(poll),
91 exit: Arc::new(atomic::AtomicUsize::new(NO_EXIT)),
92
93 initial: (),
94 adapter: Arc::new(|c| c),
95 finalizer: Arc::new(|_| ()),
96
97 thread_name_prefix: String::from("pool-"),
98 })
99 }
100}
101
102impl<L, C> PoolBuilder<L, C, (), ()>
103where
104 L: Listener,
105{
106 /// Set the initial state of each worker thread.
107 ///
108 /// Note that this method will override any finalizer that may have been set!
109 pub fn with_state<S>(self, initial: S) -> PoolBuilder<L, C, S, ()>
110 where
111 S: Clone + Send + 'static,
112 {
113 PoolBuilder {
114 listener: self.listener,
115 epoch: self.epoch,
116 exit: self.exit,
117 poll: self.poll,
118 adapter: self.adapter,
119 thread_name_prefix: self.thread_name_prefix,
120
121 initial,
122 finalizer: Arc::new(|_| ()),
123 }
124 }
125}
126
127impl<L, C, S, R> PoolBuilder<L, C, S, R>
128where
129 L: Listener,
130{
131 /// Set the thread name prefix to use for the worker threads in this pool.
132 pub fn set_thread_name_prefix(mut self, prefix: &str) -> Self {
133 self.thread_name_prefix = prefix.to_string();
134 self
135 }
136
137 /// Run accepted connections through an adapter before adding them to the pool of connections.
138 ///
139 /// This allows users to wrap something akin to an `TcpStream` into a more sophisticated
140 /// connection type (e.g., by adding buffering).
141 pub fn with_adapter<NA, NC>(self, adapter: NA) -> PoolBuilder<L, NC, S, R>
142 where
143 NA: Fn(L::Connection) -> NC + 'static + Send + Sync,
144 NC: AsRawFd + Send + 'static,
145 {
146 PoolBuilder {
147 listener: self.listener,
148 epoch: self.epoch,
149 exit: self.exit,
150 poll: self.poll,
151 initial: self.initial,
152 finalizer: self.finalizer,
153 thread_name_prefix: self.thread_name_prefix,
154
155 adapter: Arc::new(adapter),
156 }
157 }
158
159 /// Specify that a return value should be derived from the state kept by each worker.
160 ///
161 /// This return value is gathered up by the `PoolHandle` returned by `run`.
162 pub fn and_return<NF, NR>(self, fin: NF) -> PoolBuilder<L, C, S, NR>
163 where
164 NF: Fn(S) -> NR + Send + Sync + 'static,
165 NR: Send + 'static,
166 {
167 PoolBuilder {
168 listener: self.listener,
169 epoch: self.epoch,
170 exit: self.exit,
171 poll: self.poll,
172 adapter: self.adapter,
173 initial: self.initial,
174 thread_name_prefix: self.thread_name_prefix,
175
176 finalizer: Arc::new(fin),
177 }
178 }
179}
180
181impl<L, C> PoolBuilder<L, C, (), ()>
182where
183 L: Listener + 'static,
184 C: AsRawFd + Send + 'static,
185{
186 /// Run the pool with a stateless worker callback.
187 pub fn run_stateless<E>(self, workers: usize, on_ready: E) -> PoolHandle<()>
188 where
189 E: Fn(&mut C) -> io::Result<bool> + 'static + Send + Sync,
190 {
191 self.run(workers, move |c, _| on_ready(c))
192 }
193}
194
195impl<L, C, S, R> PoolBuilder<L, C, S, R>
196where
197 L: Listener + 'static,
198 C: AsRawFd + Send + 'static,
199 S: Clone + Send + 'static,
200 R: 'static + Send,
201{
202 /// Run the pool with a connection adapter, local worker state, and a state finalizer.
203 pub fn run<E>(self, workers: usize, on_ready: E) -> PoolHandle<R>
204 where
205 E: Fn(&mut C, &mut S) -> io::Result<bool> + 'static + Send + Sync,
206 {
207 let truth = Arc::new(Mutex::new(Slab::new()));
208 let on_ready = Arc::new(on_ready);
209 let wrkrs: Vec<_> = (0..workers)
210 .map(|i| {
211 worker_main(
212 &*self.thread_name_prefix,
213 i,
214 &self,
215 Arc::clone(&truth),
216 Arc::clone(&on_ready),
217 )
218 })
219 .collect();
220 PoolHandle {
221 threads: wrkrs,
222 exit: self.exit,
223 }
224 }
225}