1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
//! A worker pool collectively handling a set of connections.
//!
//! This crate is written for the use-case where a server is listening for connections, and wants
//! to spread the load of handling accepted connections across multiple threads. Specifically, this
//! crate implements a worker pool that shares a single `mio::Poll` instance, and collectively
//! accept new connections and handle events for existing ones.
//!
//! Users will want to start with the `PoolBuilder` struct, which allows creating a new pool from
//! anything that can act as a `Listener` (basically, anything that can be polled and accept new
//! connections that can themselves be polled; e.g., `mio::net::TcpListener`).
//!
//! # Examples
//!
//! ```
//! # extern crate mio;
//! # extern crate mio_pool;
//! # use mio_pool::PoolBuilder;
//! # fn main() {
//! use std::io::prelude::*;
//!
//! let addr = "127.0.0.1:0".parse().unwrap();
//! let server = mio::net::TcpListener::bind(&addr).unwrap();
//! let addr = server.local_addr().unwrap();
//! let pool = PoolBuilder::from(server).unwrap();
//! let h = pool.with_state(Vec::new()).and_return(|v| v)
//! .run(1 /* # workers */, |c: &mut mio::net::TcpStream, s: &mut Vec<u8>| {
//! // new data is available on the connection `c`!
//! let mut buf = [0u8; 1024];
//!
//! // let's just echo back what we read
//! let n = c.read(&mut buf)?;
//! if n == 0 {
//! return Ok(true);
//! }
//! c.write_all(&buf[..n])?;
//!
//! // keep some internal state
//! s.extend(&buf[..n]);
//!
//! // assume there could be more data
//! Ok(false)
//! });
//!
//! // new clients can now connect on `addr`
//! use std::net::TcpStream;
//! let mut c = TcpStream::connect(&addr).unwrap();
//! c.write_all(b"hello world").unwrap();
//! let mut buf = [0u8; 1024];
//! let n = c.read(&mut buf).unwrap();
//! assert_eq!(&buf[..n], b"hello world");
//!
//! // we can terminate the pool at any time
//! let results = h.terminate();
//! // results here contains the final state of each worker in the pool.
//! // that is, the final value in each `s` passed to the closure in `run`.
//! let result = results.into_iter().next().unwrap();
//! assert_eq!(&result.unwrap(), b"hello world");
//! # }
//! ```
#![deny(missing_docs)]
extern crate mio;
extern crate slab;
use std::io;
use std::thread;
use std::sync::{atomic, Arc};
use std::os::unix::io::AsRawFd;
pub(crate) const NO_EXIT: usize = 0;
pub(crate) const EXIT_IMMEDIATE: usize = 1;
pub(crate) const EXIT_EVENTUALLY: usize = 2;
/// An implementation of a `Poll` interface very similar to that of `mio::Poll`.
pub mod poll;
mod builder;
pub use builder::PoolBuilder;
pub(crate) mod worker;
/// Types that implement `Listener` are mio-pollable, and can accept new connections that are
/// themselves mio-pollable.
pub trait Listener: AsRawFd + Sync + Send {
/// The type of connections yielded by `accept`.
type Connection: AsRawFd + Send;
/// Accept a new connection.
///
/// This method will only be called when `mio::Ready::readable` is raised for the `Listener` by
/// a `poll`.
fn accept(&self) -> io::Result<Self::Connection>;
}
impl Listener for mio::net::TcpListener {
type Connection = mio::net::TcpStream;
fn accept(&self) -> io::Result<Self::Connection> {
self.accept().map(|(c, _)| c)
}
}
/// A handle to a currently executing mio pool.
///
/// This handle can be used to terminate the running pool, and to wait for its completion.
/// See `PoolHandle::terminate` and `PoolHandle::wait` for details.
pub struct PoolHandle<R> {
threads: Vec<thread::JoinHandle<R>>,
exit: Arc<atomic::AtomicUsize>,
}
impl<R> PoolHandle<R> {
/// Tell all running workers to terminate, and then wait for their completion.
///
/// Note that this will *not* wait for existing connections to terminate, but termination may
/// be delayed until the next time each worker is idle.
pub fn terminate(self) -> Vec<thread::Result<R>> {
self.exit.store(EXIT_IMMEDIATE, atomic::Ordering::SeqCst);
self.wait()
}
/// Stop accepting connections and wait for existing connections to complete.
///
/// This method will tell worker threads not to accept new connetions, and to exit once all
/// current connections have been closed.
///
/// Note that this method will *not* immediately drop the Listener, so new clients that try to
/// connect will hang (i.e., not get a connection refused) until the workers have all exited.
pub fn finish(self) -> Vec<thread::Result<R>> {
self.exit.store(EXIT_EVENTUALLY, atomic::Ordering::SeqCst);
self.wait()
}
/// Wait for all running workers to terminate.
///
/// This method will *not* tell worker threads to exit, and so will only return once when all
/// worker threads have crashed (which should not happen in general). Users may instead want to
/// use `PoolHandle::terminate`.
pub fn wait(self) -> Vec<thread::Result<R>> {
self.threads.into_iter().map(|jh| jh.join()).collect()
}
}