1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
use std::io;
use std::sync::{atomic, Arc, Mutex};
use slab::Slab;
use std::os::unix::io::AsRawFd;

use {Listener, PoolHandle, NO_EXIT};
use worker::worker_main;
use poll::{Poll, Token};

/// Used to configure a mio pool before launching it.
///
/// Users will want to call `PoolBuilder::from` to start a new pool from a `Listener`, and then
/// `PoolBuilder::run` with an `on_ready` callback to begin accepting and handling connections.
///
/// At a high level, the resulting pool will consist of `workers` worker threads that each accept
/// new connections and handle incoming requests. Every time a connection has available data,
/// `on_ready` will be called by one of the workers. A connection will stay in the pool until
/// `on_ready` returns an error, or `Ok(true)` to indicate EOF. Unless the pool is started with
/// `run_stateless`, `on_ready` is given mutable access to worker-local state each time it is
/// invoked. This can be useful for maintaining caches and the like.
///
/// The behavior of the pool can be customized in a couple of ways, most importantly through
/// `PoolBuilder::with_finalizer` and `PoolBuilder::and_return`. The former runs every accepted
/// connection through a function before adding it to the connection pool. The latter allows for
/// returning some part of the worker state after the pool has been terminated (e.g., for
/// statistics summaries). See the relevant method documentation for more details.
///
/// # Examples
///
/// ```
/// # extern crate mio;
/// # extern crate mio_pool;
/// # use mio_pool::PoolBuilder;
/// # fn main() {
/// let addr = "127.0.0.1:0".parse().unwrap();
/// let server = mio::net::TcpListener::bind(&addr).unwrap();
/// let pool = PoolBuilder::from(server).unwrap();
/// let h = pool.run(1 /* # workers */, |c: &mut mio::net::TcpStream, s: &mut ()| {
///     use std::io::prelude::*;
///     let mut buf = [0u8; 1024];
///     let n = c.read(&mut buf)?;
///     if n == 0 {
///         return Ok(true);
///     }
///     c.write_all(&buf[..n])?;
///     Ok(false)
/// });
///
/// // ...
/// // during this period, new clients can connect
/// // ...
///
/// let results = h.terminate();
/// // results here contains the final state of each worker in the pool.
/// // that is, the final value in each `s` passed to the closure in `run`.
/// let result = results.into_iter().next().unwrap();
/// assert_eq!(result.unwrap(), ());
/// # }
/// ```
pub struct PoolBuilder<L, C, S, R>
where
    L: Listener,
{
    pub(super) listener: Arc<L>,
    pub(super) epoch: Arc<atomic::AtomicUsize>,
    pub(super) exit: Arc<atomic::AtomicUsize>,
    pub(super) poll: Arc<Poll>,

    pub(super) initial: S,
    pub(super) adapter: Arc<Fn(L::Connection) -> C + 'static + Send + Sync>,
    pub(super) finalizer: Arc<Fn(S) -> R + Send + Sync + 'static>,
}

impl<L> PoolBuilder<L, L::Connection, (), ()>
where
    L: Listener,
{
    /// Prepare a new pool from the given listener.
    ///
    /// The pool will monitor the listener for new connections, and distribute the task of
    /// accepting them, and handling requests to accepted connections, among a pool of threads.
    pub fn from(listener: L) -> io::Result<Self> {
        let poll = Poll::new()?;
        poll.register(&listener, Token(0))?;

        Ok(PoolBuilder {
            listener: Arc::new(listener),
            epoch: Arc::new(atomic::AtomicUsize::new(1)),
            poll: Arc::new(poll),
            exit: Arc::new(atomic::AtomicUsize::new(NO_EXIT)),

            initial: (),
            adapter: Arc::new(|c| c),
            finalizer: Arc::new(|_| ()),
        })
    }
}

impl<L, C> PoolBuilder<L, C, (), ()>
where
    L: Listener,
{
    /// Set the initial state of each worker thread.
    ///
    /// Note that this method will override any finalizer that may have been set!
    pub fn with_state<S>(self, initial: S) -> PoolBuilder<L, C, S, ()>
    where
        S: Clone + Send + 'static,
    {
        PoolBuilder {
            listener: self.listener,
            epoch: self.epoch,
            exit: self.exit,
            poll: self.poll,
            adapter: self.adapter,

            initial,
            finalizer: Arc::new(|_| ()),
        }
    }
}

impl<L, C, S, R> PoolBuilder<L, C, S, R>
where
    L: Listener,
{
    /// Run accepted connections through an adapter before adding them to the pool of connections.
    ///
    /// This allows users to wrap something akin to an `TcpStream` into a more sophisticated
    /// connection type (e.g., by adding buffering).
    pub fn with_adapter<NA, NC>(self, adapter: NA) -> PoolBuilder<L, NC, S, R>
    where
        NA: Fn(L::Connection) -> NC + 'static + Send + Sync,
        NC: AsRawFd + Send + 'static,
    {
        PoolBuilder {
            listener: self.listener,
            epoch: self.epoch,
            exit: self.exit,
            poll: self.poll,
            initial: self.initial,
            finalizer: self.finalizer,

            adapter: Arc::new(adapter),
        }
    }

    /// Specify that a return value should be derived from the state kept by each worker.
    ///
    /// This return value is gathered up by the `PoolHandle` returned by `run`.
    pub fn and_return<NF, NR>(self, fin: NF) -> PoolBuilder<L, C, S, NR>
    where
        NF: Fn(S) -> NR + Send + Sync + 'static,
        NR: Send + 'static,
    {
        PoolBuilder {
            listener: self.listener,
            epoch: self.epoch,
            exit: self.exit,
            poll: self.poll,
            adapter: self.adapter,
            initial: self.initial,

            finalizer: Arc::new(fin),
        }
    }
}

impl<L, C> PoolBuilder<L, C, (), ()>
where
    L: Listener + 'static,
    C: AsRawFd + Send + 'static,
{
    /// Run the pool with a stateless worker callback.
    pub fn run_stateless<E>(self, workers: usize, on_ready: E) -> PoolHandle<()>
    where
        E: Fn(&mut C) -> io::Result<bool> + 'static + Send + Sync,
    {
        self.run(workers, move |c, _| on_ready(c))
    }
}

impl<L, C, S, R> PoolBuilder<L, C, S, R>
where
    L: Listener + 'static,
    C: AsRawFd + Send + 'static,
    S: Clone + Send + 'static,
    R: 'static + Send,
{
    /// Run the pool with a connection adapter, local worker state, and a state finalizer.
    pub fn run<E>(self, workers: usize, on_ready: E) -> PoolHandle<R>
    where
        E: Fn(&mut C, &mut S) -> io::Result<bool> + 'static + Send + Sync,
    {
        let truth = Arc::new(Mutex::new(Slab::new()));
        let on_ready = Arc::new(on_ready);
        let wrkrs: Vec<_> = (0..workers)
            .map(|i| worker_main(i, &self, Arc::clone(&truth), Arc::clone(&on_ready)))
            .collect();
        PoolHandle {
            threads: wrkrs,
            exit: self.exit,
        }
    }
}