1use std::io;
2use std::marker::PhantomData;
3use std::net::SocketAddr;
4use std::sync::Arc;
5use std::thread;
6
7use BindServer;
8use futures::stream::Stream;
9use futures::future::{Then, Future};
10use net2;
11use tokio_core::net::{TcpStream, TcpListener};
12use tokio_core::reactor::{Core, Handle};
13use tokio_service::{NewService, Service};
14
15#[derive(Debug)]
36pub struct TcpServer<Kind, P> {
37 _kind: PhantomData<Kind>,
38 proto: Arc<P>,
39 threads: usize,
40 addr: SocketAddr,
41}
42
43impl<Kind, P> TcpServer<Kind, P> where
44 P: BindServer<Kind, TcpStream> + Send + Sync + 'static
45{
46 pub fn new(protocol: P, addr: SocketAddr) -> TcpServer<Kind, P> {
60 TcpServer {
61 _kind: PhantomData,
62 proto: Arc::new(protocol),
63 threads: 1,
64 addr: addr,
65 }
66 }
67
68 pub fn addr(&mut self, addr: SocketAddr) {
70 self.addr = addr;
71 }
72
73 pub fn threads(&mut self, threads: usize) {
75 assert!(threads > 0);
76 if cfg!(unix) {
77 self.threads = threads;
78 }
79 }
80
81 pub fn serve<S>(&self, new_service: S) where
85 S: NewService + Send + Sync + 'static,
86 S::Instance: 'static,
87 P::ServiceError: 'static,
88 P::ServiceResponse: 'static,
89 P::ServiceRequest: 'static,
90 S::Request: From<P::ServiceRequest>,
91 S::Response: Into<P::ServiceResponse>,
92 S::Error: Into<P::ServiceError>,
93 {
94 let new_service = Arc::new(new_service);
95 self.with_handle(move |_| new_service.clone())
96 }
97
98 pub fn with_handle<F, S>(&self, new_service: F) where
107 F: Fn(&Handle) -> S + Send + Sync + 'static,
108 S: NewService + Send + Sync + 'static,
109 S::Instance: 'static,
110 P::ServiceError: 'static,
111 P::ServiceResponse: 'static,
112 P::ServiceRequest: 'static,
113 S::Request: From<P::ServiceRequest>,
114 S::Response: Into<P::ServiceResponse>,
115 S::Error: Into<P::ServiceError>,
116 {
117 let proto = self.proto.clone();
118 let new_service = Arc::new(new_service);
119 let addr = self.addr;
120 let workers = self.threads;
121
122 let threads = (0..self.threads - 1).map(|i| {
123 let proto = proto.clone();
124 let new_service = new_service.clone();
125
126 thread::Builder::new().name(format!("worker{}", i)).spawn(move || {
127 serve(proto, addr, workers, &*new_service)
128 }).unwrap()
129 }).collect::<Vec<_>>();
130
131 serve(proto, addr, workers, &*new_service);
132
133 for thread in threads {
134 thread.join().unwrap();
135 }
136 }
137}
138
139fn serve<P, Kind, F, S>(binder: Arc<P>, addr: SocketAddr, workers: usize, new_service: &F)
140 where P: BindServer<Kind, TcpStream>,
141 F: Fn(&Handle) -> S,
142 S: NewService + Send + Sync,
143 S::Instance: 'static,
144 P::ServiceError: 'static,
145 P::ServiceResponse: 'static,
146 P::ServiceRequest: 'static,
147 S::Request: From<P::ServiceRequest>,
148 S::Response: Into<P::ServiceResponse>,
149 S::Error: Into<P::ServiceError>,
150{
151 struct WrapService<S, Request, Response, Error> {
152 inner: S,
153 _marker: PhantomData<fn() -> (Request, Response, Error)>,
154 }
155
156 impl<S, Request, Response, Error> Service for WrapService<S, Request, Response, Error>
157 where S: Service,
158 S::Request: From<Request>,
159 S::Response: Into<Response>,
160 S::Error: Into<Error>,
161 {
162 type Request = Request;
163 type Response = Response;
164 type Error = Error;
165 type Future = Then<S::Future,
166 Result<Response, Error>,
167 fn(Result<S::Response, S::Error>) -> Result<Response, Error>>;
168
169 fn call(&self, req: Request) -> Self::Future {
170 fn change_types<A, B, C, D>(r: Result<A, B>) -> Result<C, D>
171 where A: Into<C>,
172 B: Into<D>,
173 {
174 match r {
175 Ok(e) => Ok(e.into()),
176 Err(e) => Err(e.into()),
177 }
178 }
179
180 self.inner.call(S::Request::from(req)).then(change_types)
181 }
182 }
183
184 let mut core = Core::new().unwrap();
185 let handle = core.handle();
186 let new_service = new_service(&handle);
187 let listener = listener(&addr, workers, &handle).unwrap();
188
189 let server = listener.incoming().for_each(move |(socket, _)| {
190 let service = try!(new_service.new_service());
192
193 binder.bind_server(&handle, socket, WrapService {
195 inner: service,
196 _marker: PhantomData,
197 });
198
199 Ok(())
200 });
201
202 core.run(server).unwrap();
203}
204
205fn listener(addr: &SocketAddr,
206 workers: usize,
207 handle: &Handle) -> io::Result<TcpListener> {
208 let listener = match *addr {
209 SocketAddr::V4(_) => try!(net2::TcpBuilder::new_v4()),
210 SocketAddr::V6(_) => try!(net2::TcpBuilder::new_v6()),
211 };
212 try!(configure_tcp(workers, &listener));
213 try!(listener.reuse_address(true));
214 try!(listener.bind(addr));
215 listener.listen(1024).and_then(|l| {
216 TcpListener::from_listener(l, addr, handle)
217 })
218}
219
220#[cfg(unix)]
221fn configure_tcp(workers: usize, tcp: &net2::TcpBuilder) -> io::Result<()> {
222 use net2::unix::*;
223
224 if workers > 1 {
225 try!(tcp.reuse_port(true));
226 }
227
228 Ok(())
229}
230
231#[cfg(windows)]
232fn configure_tcp(_workers: usize, _tcp: &net2::TcpBuilder) -> io::Result<()> {
233 Ok(())
234}