mio_pool/
lib.rs

1//! A worker pool collectively handling a set of connections.
2//!
3//! This crate is written for the use-case where a server is listening for connections, and wants
4//! to spread the load of handling accepted connections across multiple threads. Specifically, this
5//! crate implements a worker pool that shares a single `mio::Poll` instance, and collectively
6//! accept new connections and handle events for existing ones.
7//!
8//! Users will want to start with the `PoolBuilder` struct, which allows creating a new pool from
9//! anything that can act as a `Listener` (basically, anything that can be polled and accept new
10//! connections that can themselves be polled; e.g., `mio::net::TcpListener`).
11//!
12//! # Examples
13//!
14//! ```
15//! # extern crate mio;
16//! # extern crate mio_pool;
17//! # use mio_pool::PoolBuilder;
18//! # fn main() {
19//! use std::io::prelude::*;
20//!
21//! let addr = "127.0.0.1:0".parse().unwrap();
22//! let server = mio::net::TcpListener::bind(&addr).unwrap();
23//! let addr = server.local_addr().unwrap();
24//! let pool = PoolBuilder::from(server).unwrap();
25//! let h = pool.with_state(Vec::new()).and_return(|v| v)
26//!     .run(1 /* # workers */, |c: &mut mio::net::TcpStream, s: &mut Vec<u8>| {
27//!         // new data is available on the connection `c`!
28//!         let mut buf = [0u8; 1024];
29//!
30//!         // let's just echo back what we read
31//!         let n = c.read(&mut buf)?;
32//!         if n == 0 {
33//!             return Ok(true);
34//!         }
35//!         c.write_all(&buf[..n])?;
36//!
37//!         // keep some internal state
38//!         s.extend(&buf[..n]);
39//!
40//!         // assume there could be more data
41//!         Ok(false)
42//!     });
43//!
44//! // new clients can now connect on `addr`
45//! use std::net::TcpStream;
46//! let mut c = TcpStream::connect(&addr).unwrap();
47//! c.write_all(b"hello world").unwrap();
48//! let mut buf = [0u8; 1024];
49//! let n = c.read(&mut buf).unwrap();
50//! assert_eq!(&buf[..n], b"hello world");
51//!
52//! // we can terminate the pool at any time
53//! let results = h.terminate();
54//! // results here contains the final state of each worker in the pool.
55//! // that is, the final value in each `s` passed to the closure in `run`.
56//! let result = results.into_iter().next().unwrap();
57//! assert_eq!(&result.unwrap(), b"hello world");
58//! # }
59//! ```
60#![deny(missing_docs)]
61
62extern crate mio;
63extern crate slab;
64
65use std::io;
66use std::thread;
67use std::sync::{atomic, Arc};
68use std::os::unix::io::AsRawFd;
69
70pub(crate) const NO_EXIT: usize = 0;
71pub(crate) const EXIT_IMMEDIATE: usize = 1;
72pub(crate) const EXIT_EVENTUALLY: usize = 2;
73
74/// An implementation of a `Poll` interface very similar to that of `mio::Poll`.
75pub mod poll;
76
77mod builder;
78pub use builder::PoolBuilder;
79pub(crate) mod worker;
80
81/// Types that implement `Listener` are mio-pollable, and can accept new connections that are
82/// themselves mio-pollable.
83pub trait Listener: AsRawFd + Sync + Send {
84    /// The type of connections yielded by `accept`.
85    type Connection: AsRawFd + Send;
86
87    /// Accept a new connection.
88    ///
89    /// This method will only be called when `mio::Ready::readable` is raised for the `Listener` by
90    /// a `poll`.
91    fn accept(&self) -> io::Result<Self::Connection>;
92}
93
94impl Listener for mio::net::TcpListener {
95    type Connection = mio::net::TcpStream;
96    fn accept(&self) -> io::Result<Self::Connection> {
97        self.accept().map(|(c, _)| c)
98    }
99}
100
101/// A handle to a currently executing mio pool.
102///
103/// This handle can be used to terminate the running pool, and to wait for its completion.
104/// See `PoolHandle::terminate` and `PoolHandle::wait` for details.
105pub struct PoolHandle<R> {
106    threads: Vec<thread::JoinHandle<R>>,
107    exit: Arc<atomic::AtomicUsize>,
108}
109
110impl<R> PoolHandle<R> {
111    /// Tell all running workers to terminate, and then wait for their completion.
112    ///
113    /// Note that this will *not* wait for existing connections to terminate, but termination may
114    /// be delayed until the next time each worker is idle.
115    pub fn terminate(self) -> Vec<thread::Result<R>> {
116        self.exit.store(EXIT_IMMEDIATE, atomic::Ordering::SeqCst);
117        self.wait()
118    }
119
120    /// Stop accepting connections and wait for existing connections to complete.
121    ///
122    /// This method will tell worker threads not to accept new connetions, and to exit once all
123    /// current connections have been closed.
124    ///
125    /// Note that this method will *not* immediately drop the Listener, so new clients that try to
126    /// connect will hang (i.e., not get a connection refused) until the workers have all exited.
127    pub fn finish(self) -> Vec<thread::Result<R>> {
128        self.exit.store(EXIT_EVENTUALLY, atomic::Ordering::SeqCst);
129        self.wait()
130    }
131
132    /// Wait for all running workers to terminate.
133    ///
134    /// This method will *not* tell worker threads to exit, and so will only return once when all
135    /// worker threads have crashed (which should not happen in general). Users may instead want to
136    /// use `PoolHandle::terminate`.
137    pub fn wait(self) -> Vec<thread::Result<R>> {
138        self.threads.into_iter().map(|jh| jh.join()).collect()
139    }
140}