1use std::any::{Any, TypeId};
3use std::fmt;
4use std::io::{self, ErrorKind, Read, Write};
5use std::net::{SocketAddr, ToSocketAddrs, TcpStream, TcpListener, Shutdown};
6use std::mem;
7use std::sync::Arc;
8
9use std::time::Duration;
10
11pub enum Fresh {}
13
14pub enum Streaming {}
16
17pub trait NetworkListener: Clone {
19 type Stream: NetworkStream + Send + Clone;
21
22 fn accept(&mut self) -> ::Result<Self::Stream>;
24
25 fn local_addr(&mut self) -> io::Result<SocketAddr>;
27
28 fn incoming(&mut self) -> NetworkConnections<Self> {
30 NetworkConnections(self)
31 }
32
33 fn set_read_timeout(&mut self, _: Option<Duration>) {
35 warn!("Ignoring read timeout");
39 }
40
41 fn set_write_timeout(&mut self, _: Option<Duration>) {
43 warn!("Ignoring write timeout");
47 }
48}
49
50pub struct NetworkConnections<'a, N: NetworkListener + 'a>(&'a mut N);
52
53impl<'a, N: NetworkListener + 'a> Iterator for NetworkConnections<'a, N> {
54 type Item = ::Result<N::Stream>;
55 fn next(&mut self) -> Option<::Result<N::Stream>> {
56 Some(self.0.accept())
57 }
58}
59
60pub trait NetworkStream: Read + Write + Any + Send + ::GetType {
62 fn peer_addr(&mut self) -> io::Result<SocketAddr>;
64
65 fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()>;
67
68 fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()>;
70
71 #[inline]
73 fn close(&mut self, _how: Shutdown) -> io::Result<()> {
74 Ok(())
75 }
76
77 #[doc(hidden)]
80 fn set_previous_response_expected_no_content(&mut self, _expected: bool) { }
81
82 #[doc(hidden)]
83 fn previous_response_expected_no_content(&self) -> bool {
84 false
85 }
86}
87
88pub trait NetworkConnector {
90 type Stream: Into<Box<NetworkStream + Send>>;
92
93 fn connect(&self, host: &str, port: u16, scheme: &str) -> ::Result<Self::Stream>;
95}
96
97impl<T: NetworkStream + Send> From<T> for Box<NetworkStream + Send> {
98 fn from(s: T) -> Box<NetworkStream + Send> {
99 Box::new(s)
100 }
101}
102
103impl fmt::Debug for Box<NetworkStream + Send> {
104 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
105 fmt.pad("Box<NetworkStream>")
106 }
107}
108
109impl NetworkStream {
110 unsafe fn downcast_ref_unchecked<T: 'static>(&self) -> &T {
111 &*(mem::transmute::<*const _, (*const (), *const ())>(self).0 as *const T)
112 }
113
114 unsafe fn downcast_mut_unchecked<T: 'static>(&mut self) -> &mut T {
115 &mut *(mem::transmute::<*mut _, (*mut (), *mut ())>(self).0 as *mut T)
116 }
117
118 unsafe fn downcast_unchecked<T: 'static>(self: Box<NetworkStream>) -> Box<T> {
119 Box::from_raw(mem::transmute::<*mut _, (*mut (), *mut ())>(Box::into_raw(self)).0 as *mut T)
120 }
121}
122
123impl NetworkStream {
124 #[inline]
126 pub fn is<T: Any>(&self) -> bool {
127 (*self).get_type() == TypeId::of::<T>()
128 }
129
130 #[inline]
132 pub fn downcast_ref<T: Any>(&self) -> Option<&T> {
133 if self.is::<T>() {
134 Some(unsafe { self.downcast_ref_unchecked() })
135 } else {
136 None
137 }
138 }
139
140 #[inline]
143 pub fn downcast_mut<T: Any>(&mut self) -> Option<&mut T> {
144 if self.is::<T>() {
145 Some(unsafe { self.downcast_mut_unchecked() })
146 } else {
147 None
148 }
149 }
150
151 #[inline]
153 pub fn downcast<T: Any>(self: Box<NetworkStream>)
154 -> Result<Box<T>, Box<NetworkStream>> {
155 if self.is::<T>() {
156 Ok(unsafe { self.downcast_unchecked() })
157 } else {
158 Err(self)
159 }
160 }
161}
162
163impl NetworkStream + Send {
164 unsafe fn downcast_ref_unchecked<T: 'static>(&self) -> &T {
165 &*(mem::transmute::<*const _, (*const (), *const ())>(self).0 as *const T)
166 }
167
168 unsafe fn downcast_mut_unchecked<T: 'static>(&mut self) -> &mut T {
169 &mut *(mem::transmute::<*mut _, (*mut (), *mut ())>(self).0 as *mut T)
170 }
171
172 unsafe fn downcast_unchecked<T: 'static>(self: Box<NetworkStream + Send>) -> Box<T> {
173 Box::from_raw(mem::transmute::<*mut _, (*mut (), *mut ())>(Box::into_raw(self)).0 as *mut T)
174 }
175}
176
177impl NetworkStream + Send {
178 #[inline]
180 pub fn is<T: Any>(&self) -> bool {
181 (*self).get_type() == TypeId::of::<T>()
182 }
183
184 #[inline]
186 pub fn downcast_ref<T: Any>(&self) -> Option<&T> {
187 if self.is::<T>() {
188 Some(unsafe { self.downcast_ref_unchecked() })
189 } else {
190 None
191 }
192 }
193
194 #[inline]
197 pub fn downcast_mut<T: Any>(&mut self) -> Option<&mut T> {
198 if self.is::<T>() {
199 Some(unsafe { self.downcast_mut_unchecked() })
200 } else {
201 None
202 }
203 }
204
205 #[inline]
207 pub fn downcast<T: Any>(self: Box<NetworkStream + Send>)
208 -> Result<Box<T>, Box<NetworkStream + Send>> {
209 if self.is::<T>() {
210 Ok(unsafe { self.downcast_unchecked() })
211 } else {
212 Err(self)
213 }
214 }
215}
216
217#[derive(Clone)]
219pub struct HttpListener {
220 listener: Arc<TcpListener>,
221
222 read_timeout : Option<Duration>,
223 write_timeout: Option<Duration>,
224}
225
226impl From<TcpListener> for HttpListener {
227 fn from(listener: TcpListener) -> HttpListener {
228 HttpListener {
229 listener: Arc::new(listener),
230
231 read_timeout : None,
232 write_timeout: None,
233 }
234 }
235}
236
237impl HttpListener {
238 pub fn new<To: ToSocketAddrs>(addr: To) -> ::Result<HttpListener> {
240 Ok(HttpListener::from(try!(TcpListener::bind(addr))))
241 }
242}
243
244impl NetworkListener for HttpListener {
245 type Stream = HttpStream;
246
247 #[inline]
248 fn accept(&mut self) -> ::Result<HttpStream> {
249 let stream = HttpStream(try!(self.listener.accept()).0);
250 try!(stream.set_read_timeout(self.read_timeout));
251 try!(stream.set_write_timeout(self.write_timeout));
252 Ok(stream)
253 }
254
255 #[inline]
256 fn local_addr(&mut self) -> io::Result<SocketAddr> {
257 self.listener.local_addr()
258 }
259
260 fn set_read_timeout(&mut self, duration: Option<Duration>) {
261 self.read_timeout = duration;
262 }
263
264 fn set_write_timeout(&mut self, duration: Option<Duration>) {
265 self.write_timeout = duration;
266 }
267}
268
269#[cfg(windows)]
270impl ::std::os::windows::io::AsRawSocket for HttpListener {
271 fn as_raw_socket(&self) -> ::std::os::windows::io::RawSocket {
272 self.listener.as_raw_socket()
273 }
274}
275
276#[cfg(windows)]
277impl ::std::os::windows::io::FromRawSocket for HttpListener {
278 unsafe fn from_raw_socket(sock: ::std::os::windows::io::RawSocket) -> HttpListener {
279 HttpListener::from(TcpListener::from_raw_socket(sock))
280 }
281}
282
283#[cfg(unix)]
284impl ::std::os::unix::io::AsRawFd for HttpListener {
285 fn as_raw_fd(&self) -> ::std::os::unix::io::RawFd {
286 self.listener.as_raw_fd()
287 }
288}
289
290#[cfg(unix)]
291impl ::std::os::unix::io::FromRawFd for HttpListener {
292 unsafe fn from_raw_fd(fd: ::std::os::unix::io::RawFd) -> HttpListener {
293 HttpListener::from(TcpListener::from_raw_fd(fd))
294 }
295}
296
297pub struct HttpStream(pub TcpStream);
299
300impl Clone for HttpStream {
301 #[inline]
302 fn clone(&self) -> HttpStream {
303 HttpStream(self.0.try_clone().unwrap())
304 }
305}
306
307impl fmt::Debug for HttpStream {
308 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
309 f.write_str("HttpStream(_)")
310 }
311}
312
313impl Read for HttpStream {
314 #[inline]
315 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
316 self.0.read(buf)
317 }
318}
319
320impl Write for HttpStream {
321 #[inline]
322 fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
323 self.0.write(msg)
324 }
325 #[inline]
326 fn flush(&mut self) -> io::Result<()> {
327 self.0.flush()
328 }
329}
330
331#[cfg(windows)]
332impl ::std::os::windows::io::AsRawSocket for HttpStream {
333 fn as_raw_socket(&self) -> ::std::os::windows::io::RawSocket {
334 self.0.as_raw_socket()
335 }
336}
337
338#[cfg(windows)]
339impl ::std::os::windows::io::FromRawSocket for HttpStream {
340 unsafe fn from_raw_socket(sock: ::std::os::windows::io::RawSocket) -> HttpStream {
341 HttpStream(TcpStream::from_raw_socket(sock))
342 }
343}
344
345#[cfg(unix)]
346impl ::std::os::unix::io::AsRawFd for HttpStream {
347 fn as_raw_fd(&self) -> ::std::os::unix::io::RawFd {
348 self.0.as_raw_fd()
349 }
350}
351
352#[cfg(unix)]
353impl ::std::os::unix::io::FromRawFd for HttpStream {
354 unsafe fn from_raw_fd(fd: ::std::os::unix::io::RawFd) -> HttpStream {
355 HttpStream(TcpStream::from_raw_fd(fd))
356 }
357}
358
359impl NetworkStream for HttpStream {
360 #[inline]
361 fn peer_addr(&mut self) -> io::Result<SocketAddr> {
362 self.0.peer_addr()
363 }
364
365 #[inline]
366 fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
367 self.0.set_read_timeout(dur)
368 }
369
370 #[inline]
371 fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
372 self.0.set_write_timeout(dur)
373 }
374
375 #[inline]
376 fn close(&mut self, how: Shutdown) -> io::Result<()> {
377 match self.0.shutdown(how) {
378 Ok(_) => Ok(()),
379 Err(ref e) if e.kind() == ErrorKind::NotConnected => Ok(()),
381 err => err
382 }
383 }
384}
385
386#[derive(Debug, Clone, Default)]
388pub struct HttpConnector;
389
390impl NetworkConnector for HttpConnector {
391 type Stream = HttpStream;
392
393 fn connect(&self, host: &str, port: u16, scheme: &str) -> ::Result<HttpStream> {
394 let addr = &(host, port);
395 Ok(try!(match scheme {
396 "http" => {
397 debug!("http scheme");
398 Ok(HttpStream(try!(TcpStream::connect(addr))))
399 },
400 _ => {
401 Err(io::Error::new(io::ErrorKind::InvalidInput,
402 "Invalid scheme for Http"))
403 }
404 }))
405 }
406}
407
408impl<F> NetworkConnector for F where F: Fn(&str, u16, &str) -> io::Result<TcpStream> {
430 type Stream = HttpStream;
431
432 fn connect(&self, host: &str, port: u16, scheme: &str) -> ::Result<HttpStream> {
433 Ok(HttpStream(try!((*self)(host, port, scheme))))
434 }
435}
436
437pub trait SslClient<T: NetworkStream + Send + Clone = HttpStream> {
439 type Stream: NetworkStream + Send + Clone;
441 fn wrap_client(&self, stream: T, host: &str) -> ::Result<Self::Stream>;
443}
444
445pub trait SslServer<T: NetworkStream + Send + Clone = HttpStream> {
447 type Stream: NetworkStream + Send + Clone;
449 fn wrap_server(&self, stream: T) -> ::Result<Self::Stream>;
451}
452
453#[derive(Debug, Clone)]
455pub enum HttpsStream<S: NetworkStream> {
456 Http(HttpStream),
458 Https(S)
460}
461
462impl<S: NetworkStream> Read for HttpsStream<S> {
463 #[inline]
464 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
465 match *self {
466 HttpsStream::Http(ref mut s) => s.read(buf),
467 HttpsStream::Https(ref mut s) => s.read(buf)
468 }
469 }
470}
471
472impl<S: NetworkStream> Write for HttpsStream<S> {
473 #[inline]
474 fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
475 match *self {
476 HttpsStream::Http(ref mut s) => s.write(msg),
477 HttpsStream::Https(ref mut s) => s.write(msg)
478 }
479 }
480
481 #[inline]
482 fn flush(&mut self) -> io::Result<()> {
483 match *self {
484 HttpsStream::Http(ref mut s) => s.flush(),
485 HttpsStream::Https(ref mut s) => s.flush()
486 }
487 }
488}
489
490impl<S: NetworkStream> NetworkStream for HttpsStream<S> {
491 #[inline]
492 fn peer_addr(&mut self) -> io::Result<SocketAddr> {
493 match *self {
494 HttpsStream::Http(ref mut s) => s.peer_addr(),
495 HttpsStream::Https(ref mut s) => s.peer_addr()
496 }
497 }
498
499 #[inline]
500 fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
501 match *self {
502 HttpsStream::Http(ref inner) => inner.0.set_read_timeout(dur),
503 HttpsStream::Https(ref inner) => inner.set_read_timeout(dur)
504 }
505 }
506
507 #[inline]
508 fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
509 match *self {
510 HttpsStream::Http(ref inner) => inner.0.set_write_timeout(dur),
511 HttpsStream::Https(ref inner) => inner.set_write_timeout(dur)
512 }
513 }
514
515 #[inline]
516 fn close(&mut self, how: Shutdown) -> io::Result<()> {
517 match *self {
518 HttpsStream::Http(ref mut s) => s.close(how),
519 HttpsStream::Https(ref mut s) => s.close(how)
520 }
521 }
522}
523
524#[derive(Clone)]
526pub struct HttpsListener<S: SslServer> {
527 listener: HttpListener,
528 ssl: S,
529}
530
531impl<S: SslServer> HttpsListener<S> {
532 pub fn new<To: ToSocketAddrs>(addr: To, ssl: S) -> ::Result<HttpsListener<S>> {
534 HttpListener::new(addr).map(|l| HttpsListener {
535 listener: l,
536 ssl: ssl
537 })
538 }
539
540 pub fn with_listener(listener: HttpListener, ssl: S) -> HttpsListener<S> {
542 HttpsListener {
543 listener: listener,
544 ssl: ssl
545 }
546 }
547}
548
549impl<S: SslServer + Clone> NetworkListener for HttpsListener<S> {
550 type Stream = S::Stream;
551
552 #[inline]
553 fn accept(&mut self) -> ::Result<S::Stream> {
554 self.listener.accept().and_then(|s| self.ssl.wrap_server(s))
555 }
556
557 #[inline]
558 fn local_addr(&mut self) -> io::Result<SocketAddr> {
559 self.listener.local_addr()
560 }
561
562 fn set_read_timeout(&mut self, duration: Option<Duration>) {
563 self.listener.set_read_timeout(duration)
564 }
565
566 fn set_write_timeout(&mut self, duration: Option<Duration>) {
567 self.listener.set_write_timeout(duration)
568 }
569}
570
571#[derive(Debug, Default)]
573pub struct HttpsConnector<S: SslClient, C: NetworkConnector = HttpConnector> {
574 ssl: S,
575 connector: C,
576}
577
578impl<S: SslClient> HttpsConnector<S, HttpConnector> {
579 pub fn new(s: S) -> HttpsConnector<S, HttpConnector> {
581 HttpsConnector::with_connector(s, HttpConnector)
582 }
583}
584
585impl<S: SslClient, C: NetworkConnector> HttpsConnector<S, C> {
586 pub fn with_connector(s: S, connector: C) -> HttpsConnector<S, C> {
588 HttpsConnector { ssl: s, connector: connector }
589 }
590}
591
592impl<S: SslClient, C: NetworkConnector<Stream=HttpStream>> NetworkConnector for HttpsConnector<S, C> {
593 type Stream = HttpsStream<S::Stream>;
594
595 fn connect(&self, host: &str, port: u16, scheme: &str) -> ::Result<Self::Stream> {
596 let stream = try!(self.connector.connect(host, port, "http"));
597 if scheme == "https" {
598 debug!("https scheme");
599 self.ssl.wrap_client(stream, host).map(HttpsStream::Https)
600 } else {
601 Ok(HttpsStream::Http(stream))
602 }
603 }
604}
605
606
607#[doc(hidden)]
608pub type DefaultConnector = HttpConnector;
609
610#[cfg(test)]
611mod tests {
612 use mock::MockStream;
613 use super::{NetworkStream};
614
615 #[test]
616 fn test_downcast_box_stream() {
617 let stream: Box<NetworkStream + Send> = Box::new(MockStream::new());
619
620 let mock = stream.downcast::<MockStream>().ok().unwrap();
621 assert_eq!(mock, Box::new(MockStream::new()));
622 }
623
624 #[test]
625 fn test_downcast_unchecked_box_stream() {
626 let stream: Box<NetworkStream + Send> = Box::new(MockStream::new());
628
629 let mock = unsafe { stream.downcast_unchecked::<MockStream>() };
630 assert_eq!(mock, Box::new(MockStream::new()));
631 }
632}
633