mio_pool/lib.rs
1//! A worker pool collectively handling a set of connections.
2//!
3//! This crate is written for the use-case where a server is listening for connections, and wants
4//! to spread the load of handling accepted connections across multiple threads. Specifically, this
5//! crate implements a worker pool that shares a single `mio::Poll` instance, and collectively
6//! accept new connections and handle events for existing ones.
7//!
8//! Users will want to start with the `PoolBuilder` struct, which allows creating a new pool from
9//! anything that can act as a `Listener` (basically, anything that can be polled and accept new
10//! connections that can themselves be polled; e.g., `mio::net::TcpListener`).
11//!
12//! # Examples
13//!
14//! ```
15//! # extern crate mio;
16//! # extern crate mio_pool;
17//! # use mio_pool::PoolBuilder;
18//! # fn main() {
19//! use std::io::prelude::*;
20//!
21//! let addr = "127.0.0.1:0".parse().unwrap();
22//! let server = mio::net::TcpListener::bind(&addr).unwrap();
23//! let addr = server.local_addr().unwrap();
24//! let pool = PoolBuilder::from(server).unwrap();
25//! let h = pool.with_state(Vec::new()).and_return(|v| v)
26//! .run(1 /* # workers */, |c: &mut mio::net::TcpStream, s: &mut Vec<u8>| {
27//! // new data is available on the connection `c`!
28//! let mut buf = [0u8; 1024];
29//!
30//! // let's just echo back what we read
31//! let n = c.read(&mut buf)?;
32//! if n == 0 {
33//! return Ok(true);
34//! }
35//! c.write_all(&buf[..n])?;
36//!
37//! // keep some internal state
38//! s.extend(&buf[..n]);
39//!
40//! // assume there could be more data
41//! Ok(false)
42//! });
43//!
44//! // new clients can now connect on `addr`
45//! use std::net::TcpStream;
46//! let mut c = TcpStream::connect(&addr).unwrap();
47//! c.write_all(b"hello world").unwrap();
48//! let mut buf = [0u8; 1024];
49//! let n = c.read(&mut buf).unwrap();
50//! assert_eq!(&buf[..n], b"hello world");
51//!
52//! // we can terminate the pool at any time
53//! let results = h.terminate();
54//! // results here contains the final state of each worker in the pool.
55//! // that is, the final value in each `s` passed to the closure in `run`.
56//! let result = results.into_iter().next().unwrap();
57//! assert_eq!(&result.unwrap(), b"hello world");
58//! # }
59//! ```
60#![deny(missing_docs)]
61
62extern crate mio;
63extern crate slab;
64
65use std::io;
66use std::thread;
67use std::sync::{atomic, Arc};
68use std::os::unix::io::AsRawFd;
69
70pub(crate) const NO_EXIT: usize = 0;
71pub(crate) const EXIT_IMMEDIATE: usize = 1;
72pub(crate) const EXIT_EVENTUALLY: usize = 2;
73
74/// An implementation of a `Poll` interface very similar to that of `mio::Poll`.
75pub mod poll;
76
77mod builder;
78pub use builder::PoolBuilder;
79pub(crate) mod worker;
80
81/// Types that implement `Listener` are mio-pollable, and can accept new connections that are
82/// themselves mio-pollable.
83pub trait Listener: AsRawFd + Sync + Send {
84 /// The type of connections yielded by `accept`.
85 type Connection: AsRawFd + Send;
86
87 /// Accept a new connection.
88 ///
89 /// This method will only be called when `mio::Ready::readable` is raised for the `Listener` by
90 /// a `poll`.
91 fn accept(&self) -> io::Result<Self::Connection>;
92}
93
94impl Listener for mio::net::TcpListener {
95 type Connection = mio::net::TcpStream;
96 fn accept(&self) -> io::Result<Self::Connection> {
97 self.accept().map(|(c, _)| c)
98 }
99}
100
101/// A handle to a currently executing mio pool.
102///
103/// This handle can be used to terminate the running pool, and to wait for its completion.
104/// See `PoolHandle::terminate` and `PoolHandle::wait` for details.
105pub struct PoolHandle<R> {
106 threads: Vec<thread::JoinHandle<R>>,
107 exit: Arc<atomic::AtomicUsize>,
108}
109
110impl<R> PoolHandle<R> {
111 /// Tell all running workers to terminate, and then wait for their completion.
112 ///
113 /// Note that this will *not* wait for existing connections to terminate, but termination may
114 /// be delayed until the next time each worker is idle.
115 pub fn terminate(self) -> Vec<thread::Result<R>> {
116 self.exit.store(EXIT_IMMEDIATE, atomic::Ordering::SeqCst);
117 self.wait()
118 }
119
120 /// Stop accepting connections and wait for existing connections to complete.
121 ///
122 /// This method will tell worker threads not to accept new connetions, and to exit once all
123 /// current connections have been closed.
124 ///
125 /// Note that this method will *not* immediately drop the Listener, so new clients that try to
126 /// connect will hang (i.e., not get a connection refused) until the workers have all exited.
127 pub fn finish(self) -> Vec<thread::Result<R>> {
128 self.exit.store(EXIT_EVENTUALLY, atomic::Ordering::SeqCst);
129 self.wait()
130 }
131
132 /// Wait for all running workers to terminate.
133 ///
134 /// This method will *not* tell worker threads to exit, and so will only return once when all
135 /// worker threads have crashed (which should not happen in general). Users may instead want to
136 /// use `PoolHandle::terminate`.
137 pub fn wait(self) -> Vec<thread::Result<R>> {
138 self.threads.into_iter().map(|jh| jh.join()).collect()
139 }
140}