tokio_proto/
tcp_server.rs

1use std::io;
2use std::marker::PhantomData;
3use std::net::SocketAddr;
4use std::sync::Arc;
5use std::thread;
6
7use BindServer;
8use futures::stream::Stream;
9use futures::future::{Then, Future};
10use net2;
11use tokio_core::net::{TcpStream, TcpListener};
12use tokio_core::reactor::{Core, Handle};
13use tokio_service::{NewService, Service};
14
15// TODO: Add more options, e.g.:
16// - max concurrent requests
17// - request timeout
18// - read timeout
19// - write timeout
20// - max idle time
21// - max lifetime
22
23/// A builder for TCP servers.
24///
25/// Setting up a server needs, at minimum:
26///
27/// - A server protocol implementation
28/// - An address
29/// - A service to provide
30///
31/// In addition to those basics, the builder provides some additional
32/// configuration, which is expected to grow over time.
33///
34/// See the crate docs for an example.
35#[derive(Debug)]
36pub struct TcpServer<Kind, P> {
37    _kind: PhantomData<Kind>,
38    proto: Arc<P>,
39    threads: usize,
40    addr: SocketAddr,
41}
42
43impl<Kind, P> TcpServer<Kind, P> where
44    P: BindServer<Kind, TcpStream> + Send + Sync + 'static
45{
46    /// Starts building a server for the given protocol and address, with
47    /// default configuration.
48    ///
49    /// Generally, a protocol is implemented *not* by implementing the
50    /// `BindServer` trait directly, but instead by implementing one of the
51    /// protocol traits:
52    ///
53    /// - `pipeline::ServerProto`
54    /// - `multiplex::ServerProto`
55    /// - `streaming::pipeline::ServerProto`
56    /// - `streaming::multiplex::ServerProto`
57    ///
58    /// See the crate documentation for more details on those traits.
59    pub fn new(protocol: P, addr: SocketAddr) -> TcpServer<Kind, P> {
60        TcpServer {
61            _kind: PhantomData,
62            proto: Arc::new(protocol),
63            threads: 1,
64            addr: addr,
65        }
66    }
67
68    /// Set the address for the server.
69    pub fn addr(&mut self, addr: SocketAddr) {
70        self.addr = addr;
71    }
72
73    /// Set the number of threads running simultaneous event loops (Unix only).
74    pub fn threads(&mut self, threads: usize) {
75        assert!(threads > 0);
76        if cfg!(unix) {
77            self.threads = threads;
78        }
79    }
80
81    /// Start up the server, providing the given service on it.
82    ///
83    /// This method will block the current thread until the server is shut down.
84    pub fn serve<S>(&self, new_service: S) where
85        S: NewService + Send + Sync + 'static,
86        S::Instance: 'static,
87        P::ServiceError: 'static,
88        P::ServiceResponse: 'static,
89        P::ServiceRequest: 'static,
90        S::Request: From<P::ServiceRequest>,
91        S::Response: Into<P::ServiceResponse>,
92        S::Error: Into<P::ServiceError>,
93    {
94        let new_service = Arc::new(new_service);
95        self.with_handle(move |_| new_service.clone())
96    }
97
98    /// Start up the server, providing the given service on it, and providing
99    /// access to the event loop handle.
100    ///
101    /// The `new_service` argument is a closure that is given an event loop
102    /// handle, and produces a value implementing `NewService`. That value is in
103    /// turn used to make a new service instance for each incoming connection.
104    ///
105    /// This method will block the current thread until the server is shut down.
106    pub fn with_handle<F, S>(&self, new_service: F) where
107        F: Fn(&Handle) -> S + Send + Sync + 'static,
108        S: NewService + Send + Sync + 'static,
109        S::Instance: 'static,
110        P::ServiceError: 'static,
111        P::ServiceResponse: 'static,
112        P::ServiceRequest: 'static,
113        S::Request: From<P::ServiceRequest>,
114        S::Response: Into<P::ServiceResponse>,
115        S::Error: Into<P::ServiceError>,
116    {
117        let proto = self.proto.clone();
118        let new_service = Arc::new(new_service);
119        let addr = self.addr;
120        let workers = self.threads;
121
122        let threads = (0..self.threads - 1).map(|i| {
123            let proto = proto.clone();
124            let new_service = new_service.clone();
125
126            thread::Builder::new().name(format!("worker{}", i)).spawn(move || {
127                serve(proto, addr, workers, &*new_service)
128            }).unwrap()
129        }).collect::<Vec<_>>();
130
131        serve(proto, addr, workers, &*new_service);
132
133        for thread in threads {
134            thread.join().unwrap();
135        }
136    }
137}
138
139fn serve<P, Kind, F, S>(binder: Arc<P>, addr: SocketAddr, workers: usize, new_service: &F)
140    where P: BindServer<Kind, TcpStream>,
141          F: Fn(&Handle) -> S,
142          S: NewService + Send + Sync,
143          S::Instance: 'static,
144          P::ServiceError: 'static,
145          P::ServiceResponse: 'static,
146          P::ServiceRequest: 'static,
147          S::Request: From<P::ServiceRequest>,
148          S::Response: Into<P::ServiceResponse>,
149          S::Error: Into<P::ServiceError>,
150{
151    struct WrapService<S, Request, Response, Error> {
152        inner: S,
153        _marker: PhantomData<fn() -> (Request, Response, Error)>,
154    }
155
156    impl<S, Request, Response, Error> Service for WrapService<S, Request, Response, Error>
157        where S: Service,
158              S::Request: From<Request>,
159              S::Response: Into<Response>,
160              S::Error: Into<Error>,
161    {
162        type Request = Request;
163        type Response = Response;
164        type Error = Error;
165        type Future = Then<S::Future,
166                           Result<Response, Error>,
167                           fn(Result<S::Response, S::Error>) -> Result<Response, Error>>;
168
169        fn call(&self, req: Request) -> Self::Future {
170            fn change_types<A, B, C, D>(r: Result<A, B>) -> Result<C, D>
171                where A: Into<C>,
172                      B: Into<D>,
173            {
174                match r {
175                    Ok(e) => Ok(e.into()),
176                    Err(e) => Err(e.into()),
177                }
178            }
179
180            self.inner.call(S::Request::from(req)).then(change_types)
181        }
182    }
183
184    let mut core = Core::new().unwrap();
185    let handle = core.handle();
186    let new_service = new_service(&handle);
187    let listener = listener(&addr, workers, &handle).unwrap();
188
189    let server = listener.incoming().for_each(move |(socket, _)| {
190        // Create the service
191        let service = try!(new_service.new_service());
192
193        // Bind it!
194        binder.bind_server(&handle, socket, WrapService {
195            inner: service,
196            _marker: PhantomData,
197        });
198
199        Ok(())
200    });
201
202    core.run(server).unwrap();
203}
204
205fn listener(addr: &SocketAddr,
206            workers: usize,
207            handle: &Handle) -> io::Result<TcpListener> {
208    let listener = match *addr {
209        SocketAddr::V4(_) => try!(net2::TcpBuilder::new_v4()),
210        SocketAddr::V6(_) => try!(net2::TcpBuilder::new_v6()),
211    };
212    try!(configure_tcp(workers, &listener));
213    try!(listener.reuse_address(true));
214    try!(listener.bind(addr));
215    listener.listen(1024).and_then(|l| {
216        TcpListener::from_listener(l, addr, handle)
217    })
218}
219
220#[cfg(unix)]
221fn configure_tcp(workers: usize, tcp: &net2::TcpBuilder) -> io::Result<()> {
222    use net2::unix::*;
223
224    if workers > 1 {
225        try!(tcp.reuse_port(true));
226    }
227
228    Ok(())
229}
230
231#[cfg(windows)]
232fn configure_tcp(_workers: usize, _tcp: &net2::TcpBuilder) -> io::Result<()> {
233    Ok(())
234}