mio_pool/
builder.rs

1use std::io;
2use std::sync::{atomic, Arc, Mutex};
3use slab::Slab;
4use std::os::unix::io::AsRawFd;
5
6use {Listener, PoolHandle, NO_EXIT};
7use worker::worker_main;
8use poll::{Poll, Token};
9
10/// Used to configure a mio pool before launching it.
11///
12/// Users will want to call `PoolBuilder::from` to start a new pool from a `Listener`, and then
13/// `PoolBuilder::run` with an `on_ready` callback to begin accepting and handling connections.
14///
15/// At a high level, the resulting pool will consist of `workers` worker threads that each accept
16/// new connections and handle incoming requests. Every time a connection has available data,
17/// `on_ready` will be called by one of the workers. A connection will stay in the pool until
18/// `on_ready` returns an error, or `Ok(true)` to indicate EOF. Unless the pool is started with
19/// `run_stateless`, `on_ready` is given mutable access to worker-local state each time it is
20/// invoked. This can be useful for maintaining caches and the like.
21///
22/// The behavior of the pool can be customized in a couple of ways, most importantly through
23/// `PoolBuilder::with_finalizer` and `PoolBuilder::and_return`. The former runs every accepted
24/// connection through a function before adding it to the connection pool. The latter allows for
25/// returning some part of the worker state after the pool has been terminated (e.g., for
26/// statistics summaries). See the relevant method documentation for more details.
27///
28/// # Examples
29///
30/// ```
31/// # extern crate mio;
32/// # extern crate mio_pool;
33/// # use mio_pool::PoolBuilder;
34/// # fn main() {
35/// let addr = "127.0.0.1:0".parse().unwrap();
36/// let server = mio::net::TcpListener::bind(&addr).unwrap();
37/// let pool = PoolBuilder::from(server).unwrap();
38/// let h = pool.run(1 /* # workers */, |c: &mut mio::net::TcpStream, s: &mut ()| {
39///     use std::io::prelude::*;
40///     let mut buf = [0u8; 1024];
41///     let n = c.read(&mut buf)?;
42///     if n == 0 {
43///         return Ok(true);
44///     }
45///     c.write_all(&buf[..n])?;
46///     Ok(false)
47/// });
48///
49/// // ...
50/// // during this period, new clients can connect
51/// // ...
52///
53/// let results = h.terminate();
54/// // results here contains the final state of each worker in the pool.
55/// // that is, the final value in each `s` passed to the closure in `run`.
56/// let result = results.into_iter().next().unwrap();
57/// assert_eq!(result.unwrap(), ());
58/// # }
59/// ```
60pub struct PoolBuilder<L, C, S, R>
61where
62    L: Listener,
63{
64    pub(super) listener: Arc<L>,
65    pub(super) epoch: Arc<atomic::AtomicUsize>,
66    pub(super) exit: Arc<atomic::AtomicUsize>,
67    pub(super) poll: Arc<Poll>,
68
69    pub(super) initial: S,
70    pub(super) adapter: Arc<Fn(L::Connection) -> C + 'static + Send + Sync>,
71    pub(super) finalizer: Arc<Fn(S) -> R + Send + Sync + 'static>,
72    thread_name_prefix: String,
73}
74
75impl<L> PoolBuilder<L, L::Connection, (), ()>
76where
77    L: Listener,
78{
79    /// Prepare a new pool from the given listener.
80    ///
81    /// The pool will monitor the listener for new connections, and distribute the task of
82    /// accepting them, and handling requests to accepted connections, among a pool of threads.
83    pub fn from(listener: L) -> io::Result<Self> {
84        let poll = Poll::new()?;
85        poll.register(&listener, Token(0))?;
86
87        Ok(PoolBuilder {
88            listener: Arc::new(listener),
89            epoch: Arc::new(atomic::AtomicUsize::new(1)),
90            poll: Arc::new(poll),
91            exit: Arc::new(atomic::AtomicUsize::new(NO_EXIT)),
92
93            initial: (),
94            adapter: Arc::new(|c| c),
95            finalizer: Arc::new(|_| ()),
96
97            thread_name_prefix: String::from("pool-"),
98        })
99    }
100}
101
102impl<L, C> PoolBuilder<L, C, (), ()>
103where
104    L: Listener,
105{
106    /// Set the initial state of each worker thread.
107    ///
108    /// Note that this method will override any finalizer that may have been set!
109    pub fn with_state<S>(self, initial: S) -> PoolBuilder<L, C, S, ()>
110    where
111        S: Clone + Send + 'static,
112    {
113        PoolBuilder {
114            listener: self.listener,
115            epoch: self.epoch,
116            exit: self.exit,
117            poll: self.poll,
118            adapter: self.adapter,
119            thread_name_prefix: self.thread_name_prefix,
120
121            initial,
122            finalizer: Arc::new(|_| ()),
123        }
124    }
125}
126
127impl<L, C, S, R> PoolBuilder<L, C, S, R>
128where
129    L: Listener,
130{
131    /// Set the thread name prefix to use for the worker threads in this pool.
132    pub fn set_thread_name_prefix(mut self, prefix: &str) -> Self {
133        self.thread_name_prefix = prefix.to_string();
134        self
135    }
136
137    /// Run accepted connections through an adapter before adding them to the pool of connections.
138    ///
139    /// This allows users to wrap something akin to an `TcpStream` into a more sophisticated
140    /// connection type (e.g., by adding buffering).
141    pub fn with_adapter<NA, NC>(self, adapter: NA) -> PoolBuilder<L, NC, S, R>
142    where
143        NA: Fn(L::Connection) -> NC + 'static + Send + Sync,
144        NC: AsRawFd + Send + 'static,
145    {
146        PoolBuilder {
147            listener: self.listener,
148            epoch: self.epoch,
149            exit: self.exit,
150            poll: self.poll,
151            initial: self.initial,
152            finalizer: self.finalizer,
153            thread_name_prefix: self.thread_name_prefix,
154
155            adapter: Arc::new(adapter),
156        }
157    }
158
159    /// Specify that a return value should be derived from the state kept by each worker.
160    ///
161    /// This return value is gathered up by the `PoolHandle` returned by `run`.
162    pub fn and_return<NF, NR>(self, fin: NF) -> PoolBuilder<L, C, S, NR>
163    where
164        NF: Fn(S) -> NR + Send + Sync + 'static,
165        NR: Send + 'static,
166    {
167        PoolBuilder {
168            listener: self.listener,
169            epoch: self.epoch,
170            exit: self.exit,
171            poll: self.poll,
172            adapter: self.adapter,
173            initial: self.initial,
174            thread_name_prefix: self.thread_name_prefix,
175
176            finalizer: Arc::new(fin),
177        }
178    }
179}
180
181impl<L, C> PoolBuilder<L, C, (), ()>
182where
183    L: Listener + 'static,
184    C: AsRawFd + Send + 'static,
185{
186    /// Run the pool with a stateless worker callback.
187    pub fn run_stateless<E>(self, workers: usize, on_ready: E) -> PoolHandle<()>
188    where
189        E: Fn(&mut C) -> io::Result<bool> + 'static + Send + Sync,
190    {
191        self.run(workers, move |c, _| on_ready(c))
192    }
193}
194
195impl<L, C, S, R> PoolBuilder<L, C, S, R>
196where
197    L: Listener + 'static,
198    C: AsRawFd + Send + 'static,
199    S: Clone + Send + 'static,
200    R: 'static + Send,
201{
202    /// Run the pool with a connection adapter, local worker state, and a state finalizer.
203    pub fn run<E>(self, workers: usize, on_ready: E) -> PoolHandle<R>
204    where
205        E: Fn(&mut C, &mut S) -> io::Result<bool> + 'static + Send + Sync,
206    {
207        let truth = Arc::new(Mutex::new(Slab::new()));
208        let on_ready = Arc::new(on_ready);
209        let wrkrs: Vec<_> = (0..workers)
210            .map(|i| {
211                worker_main(
212                    &*self.thread_name_prefix,
213                    i,
214                    &self,
215                    Arc::clone(&truth),
216                    Arc::clone(&on_ready),
217                )
218            })
219            .collect();
220        PoolHandle {
221            threads: wrkrs,
222            exit: self.exit,
223        }
224    }
225}