hyper_sync/
net.rs

1//! A collection of traits abstracting over Listeners and Streams.
2use std::any::{Any, TypeId};
3use std::fmt;
4use std::io::{self, ErrorKind, Read, Write};
5use std::net::{SocketAddr, ToSocketAddrs, TcpStream, TcpListener, Shutdown};
6use std::mem;
7use std::sync::Arc;
8
9use std::time::Duration;
10
11/// The write-status indicating headers have not been written.
12pub enum Fresh {}
13
14/// The write-status indicating headers have been written.
15pub enum Streaming {}
16
17/// An abstraction to listen for connections on a certain port.
18pub trait NetworkListener: Clone {
19    /// The stream produced for each connection.
20    type Stream: NetworkStream + Send + Clone;
21
22    /// Returns an iterator of streams.
23    fn accept(&mut self) -> ::Result<Self::Stream>;
24
25    /// Get the address this Listener ended up listening on.
26    fn local_addr(&mut self) -> io::Result<SocketAddr>;
27
28    /// Returns an iterator over incoming connections.
29    fn incoming(&mut self) -> NetworkConnections<Self> {
30        NetworkConnections(self)
31    }
32
33    /// Sets the read timeout for all streams that are accepted
34    fn set_read_timeout(&mut self, _: Option<Duration>) {
35        // This default implementation is only here to prevent the addition of
36        // these methods from being a breaking change. They should be removed
37        // when the next breaking release is made.
38        warn!("Ignoring read timeout");
39    }
40
41    /// Sets the write timeout for all streams that are accepted
42    fn set_write_timeout(&mut self, _: Option<Duration>) {
43        // This default implementation is only here to prevent the addition of
44        // these methods from being a breaking change. They should be removed
45        // when the next breaking release is made.
46        warn!("Ignoring write timeout");
47    }
48}
49
50/// An iterator wrapper over a `NetworkAcceptor`.
51pub struct NetworkConnections<'a, N: NetworkListener + 'a>(&'a mut N);
52
53impl<'a, N: NetworkListener + 'a> Iterator for NetworkConnections<'a, N> {
54    type Item = ::Result<N::Stream>;
55    fn next(&mut self) -> Option<::Result<N::Stream>> {
56        Some(self.0.accept())
57    }
58}
59
60/// An abstraction over streams that a `Server` can utilize.
61pub trait NetworkStream: Read + Write + Any + Send + ::GetType {
62    /// Get the remote address of the underlying connection.
63    fn peer_addr(&mut self) -> io::Result<SocketAddr>;
64
65    /// Set the maximum time to wait for a read to complete.
66    fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()>;
67
68    /// Set the maximum time to wait for a write to complete.
69    fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()>;
70
71    /// This will be called when Stream should no longer be kept alive.
72    #[inline]
73    fn close(&mut self, _how: Shutdown) -> io::Result<()> {
74        Ok(())
75    }
76
77    // Unsure about name and implementation...
78
79    #[doc(hidden)]
80    fn set_previous_response_expected_no_content(&mut self, _expected: bool) { }
81
82    #[doc(hidden)]
83    fn previous_response_expected_no_content(&self) -> bool {
84        false
85    }
86}
87
88/// A connector creates a NetworkStream.
89pub trait NetworkConnector {
90    /// Type of `Stream` to create
91    type Stream: Into<Box<NetworkStream + Send>>;
92
93    /// Connect to a remote address.
94    fn connect(&self, host: &str, port: u16, scheme: &str) -> ::Result<Self::Stream>;
95}
96
97impl<T: NetworkStream + Send> From<T> for Box<NetworkStream + Send> {
98    fn from(s: T) -> Box<NetworkStream + Send> {
99        Box::new(s)
100    }
101}
102
103impl fmt::Debug for Box<NetworkStream + Send> {
104    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
105        fmt.pad("Box<NetworkStream>")
106    }
107}
108
109impl NetworkStream {
110    unsafe fn downcast_ref_unchecked<T: 'static>(&self) -> &T {
111        &*(mem::transmute::<*const _, (*const (), *const ())>(self).0 as *const T)
112    }
113
114    unsafe fn downcast_mut_unchecked<T: 'static>(&mut self) -> &mut T {
115        &mut *(mem::transmute::<*mut _, (*mut (), *mut ())>(self).0 as *mut T)
116    }
117
118    unsafe fn downcast_unchecked<T: 'static>(self: Box<NetworkStream>) -> Box<T>  {
119        Box::from_raw(mem::transmute::<*mut _, (*mut (), *mut ())>(Box::into_raw(self)).0 as *mut T)
120    }
121}
122
123impl NetworkStream {
124    /// Is the underlying type in this trait object a `T`?
125    #[inline]
126    pub fn is<T: Any>(&self) -> bool {
127        (*self).get_type() == TypeId::of::<T>()
128    }
129
130    /// If the underlying type is `T`, get a reference to the contained data.
131    #[inline]
132    pub fn downcast_ref<T: Any>(&self) -> Option<&T> {
133        if self.is::<T>() {
134            Some(unsafe { self.downcast_ref_unchecked() })
135        } else {
136            None
137        }
138    }
139
140    /// If the underlying type is `T`, get a mutable reference to the contained
141    /// data.
142    #[inline]
143    pub fn downcast_mut<T: Any>(&mut self) -> Option<&mut T> {
144        if self.is::<T>() {
145            Some(unsafe { self.downcast_mut_unchecked() })
146        } else {
147            None
148        }
149    }
150
151    /// If the underlying type is `T`, extract it.
152    #[inline]
153    pub fn downcast<T: Any>(self: Box<NetworkStream>)
154            -> Result<Box<T>, Box<NetworkStream>> {
155        if self.is::<T>() {
156            Ok(unsafe { self.downcast_unchecked() })
157        } else {
158            Err(self)
159        }
160    }
161}
162
163impl NetworkStream + Send {
164    unsafe fn downcast_ref_unchecked<T: 'static>(&self) -> &T {
165        &*(mem::transmute::<*const _, (*const (), *const ())>(self).0 as *const T)
166    }
167
168    unsafe fn downcast_mut_unchecked<T: 'static>(&mut self) -> &mut T {
169        &mut *(mem::transmute::<*mut _, (*mut (), *mut ())>(self).0 as *mut T)
170    }
171
172    unsafe fn downcast_unchecked<T: 'static>(self: Box<NetworkStream + Send>) -> Box<T>  {
173        Box::from_raw(mem::transmute::<*mut _, (*mut (), *mut ())>(Box::into_raw(self)).0 as *mut T)
174    }
175}
176
177impl NetworkStream + Send {
178    /// Is the underlying type in this trait object a `T`?
179    #[inline]
180    pub fn is<T: Any>(&self) -> bool {
181        (*self).get_type() == TypeId::of::<T>()
182    }
183
184    /// If the underlying type is `T`, get a reference to the contained data.
185    #[inline]
186    pub fn downcast_ref<T: Any>(&self) -> Option<&T> {
187        if self.is::<T>() {
188            Some(unsafe { self.downcast_ref_unchecked() })
189        } else {
190            None
191        }
192    }
193
194    /// If the underlying type is `T`, get a mutable reference to the contained
195    /// data.
196    #[inline]
197    pub fn downcast_mut<T: Any>(&mut self) -> Option<&mut T> {
198        if self.is::<T>() {
199            Some(unsafe { self.downcast_mut_unchecked() })
200        } else {
201            None
202        }
203    }
204
205    /// If the underlying type is `T`, extract it.
206    #[inline]
207    pub fn downcast<T: Any>(self: Box<NetworkStream + Send>)
208            -> Result<Box<T>, Box<NetworkStream + Send>> {
209        if self.is::<T>() {
210            Ok(unsafe { self.downcast_unchecked() })
211        } else {
212            Err(self)
213        }
214    }
215}
216
217/// A `NetworkListener` for `HttpStream`s.
218#[derive(Clone)]
219pub struct HttpListener {
220    listener: Arc<TcpListener>,
221
222    read_timeout : Option<Duration>,
223    write_timeout: Option<Duration>,
224}
225
226impl From<TcpListener> for HttpListener {
227    fn from(listener: TcpListener) -> HttpListener {
228        HttpListener {
229            listener: Arc::new(listener),
230
231            read_timeout : None,
232            write_timeout: None,
233        }
234    }
235}
236
237impl HttpListener {
238    /// Start listening to an address over HTTP.
239    pub fn new<To: ToSocketAddrs>(addr: To) -> ::Result<HttpListener> {
240        Ok(HttpListener::from(try!(TcpListener::bind(addr))))
241    }
242}
243
244impl NetworkListener for HttpListener {
245    type Stream = HttpStream;
246
247    #[inline]
248    fn accept(&mut self) -> ::Result<HttpStream> {
249        let stream = HttpStream(try!(self.listener.accept()).0);
250        try!(stream.set_read_timeout(self.read_timeout));
251        try!(stream.set_write_timeout(self.write_timeout));
252        Ok(stream)
253    }
254
255    #[inline]
256    fn local_addr(&mut self) -> io::Result<SocketAddr> {
257        self.listener.local_addr()
258    }
259
260    fn set_read_timeout(&mut self, duration: Option<Duration>) {
261        self.read_timeout = duration;
262    }
263
264    fn set_write_timeout(&mut self, duration: Option<Duration>) {
265        self.write_timeout = duration;
266    }
267}
268
269#[cfg(windows)]
270impl ::std::os::windows::io::AsRawSocket for HttpListener {
271    fn as_raw_socket(&self) -> ::std::os::windows::io::RawSocket {
272        self.listener.as_raw_socket()
273    }
274}
275
276#[cfg(windows)]
277impl ::std::os::windows::io::FromRawSocket for HttpListener {
278    unsafe fn from_raw_socket(sock: ::std::os::windows::io::RawSocket) -> HttpListener {
279        HttpListener::from(TcpListener::from_raw_socket(sock))
280    }
281}
282
283#[cfg(unix)]
284impl ::std::os::unix::io::AsRawFd for HttpListener {
285    fn as_raw_fd(&self) -> ::std::os::unix::io::RawFd {
286        self.listener.as_raw_fd()
287    }
288}
289
290#[cfg(unix)]
291impl ::std::os::unix::io::FromRawFd for HttpListener {
292    unsafe fn from_raw_fd(fd: ::std::os::unix::io::RawFd) -> HttpListener {
293        HttpListener::from(TcpListener::from_raw_fd(fd))
294    }
295}
296
297/// A wrapper around a `TcpStream`.
298pub struct HttpStream(pub TcpStream);
299
300impl Clone for HttpStream {
301    #[inline]
302    fn clone(&self) -> HttpStream {
303        HttpStream(self.0.try_clone().unwrap())
304    }
305}
306
307impl fmt::Debug for HttpStream {
308    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
309        f.write_str("HttpStream(_)")
310    }
311}
312
313impl Read for HttpStream {
314    #[inline]
315    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
316        self.0.read(buf)
317    }
318}
319
320impl Write for HttpStream {
321    #[inline]
322    fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
323        self.0.write(msg)
324    }
325    #[inline]
326    fn flush(&mut self) -> io::Result<()> {
327        self.0.flush()
328    }
329}
330
331#[cfg(windows)]
332impl ::std::os::windows::io::AsRawSocket for HttpStream {
333    fn as_raw_socket(&self) -> ::std::os::windows::io::RawSocket {
334        self.0.as_raw_socket()
335    }
336}
337
338#[cfg(windows)]
339impl ::std::os::windows::io::FromRawSocket for HttpStream {
340    unsafe fn from_raw_socket(sock: ::std::os::windows::io::RawSocket) -> HttpStream {
341        HttpStream(TcpStream::from_raw_socket(sock))
342    }
343}
344
345#[cfg(unix)]
346impl ::std::os::unix::io::AsRawFd for HttpStream {
347    fn as_raw_fd(&self) -> ::std::os::unix::io::RawFd {
348        self.0.as_raw_fd()
349    }
350}
351
352#[cfg(unix)]
353impl ::std::os::unix::io::FromRawFd for HttpStream {
354    unsafe fn from_raw_fd(fd: ::std::os::unix::io::RawFd) -> HttpStream {
355        HttpStream(TcpStream::from_raw_fd(fd))
356    }
357}
358
359impl NetworkStream for HttpStream {
360    #[inline]
361    fn peer_addr(&mut self) -> io::Result<SocketAddr> {
362            self.0.peer_addr()
363    }
364
365    #[inline]
366    fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
367        self.0.set_read_timeout(dur)
368    }
369
370    #[inline]
371    fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
372        self.0.set_write_timeout(dur)
373    }
374
375    #[inline]
376    fn close(&mut self, how: Shutdown) -> io::Result<()> {
377        match self.0.shutdown(how) {
378            Ok(_) => Ok(()),
379            // see https://github.com/hyperium/hyper/issues/508
380            Err(ref e) if e.kind() == ErrorKind::NotConnected => Ok(()),
381            err => err
382        }
383    }
384}
385
386/// A connector that will produce HttpStreams.
387#[derive(Debug, Clone, Default)]
388pub struct HttpConnector;
389
390impl NetworkConnector for HttpConnector {
391    type Stream = HttpStream;
392
393    fn connect(&self, host: &str, port: u16, scheme: &str) -> ::Result<HttpStream> {
394        let addr = &(host, port);
395        Ok(try!(match scheme {
396            "http" => {
397                debug!("http scheme");
398                Ok(HttpStream(try!(TcpStream::connect(addr))))
399            },
400            _ => {
401                Err(io::Error::new(io::ErrorKind::InvalidInput,
402                                "Invalid scheme for Http"))
403            }
404        }))
405    }
406}
407
408/// A closure as a connector used to generate `TcpStream`s per request
409///
410/// # Example
411///
412/// Basic example:
413///
414/// ```norun
415/// Client::with_connector(|addr: &str, port: u16, scheme: &str| {
416///     TcpStream::connect(&(addr, port))
417/// });
418/// ```
419///
420/// Example using `TcpBuilder` from the net2 crate if you want to configure your source socket:
421///
422/// ```norun
423/// Client::with_connector(|addr: &str, port: u16, scheme: &str| {
424///     let b = try!(TcpBuilder::new_v4());
425///     try!(b.bind("127.0.0.1:0"));
426///     b.connect(&(addr, port))
427/// });
428/// ```
429impl<F> NetworkConnector for F where F: Fn(&str, u16, &str) -> io::Result<TcpStream> {
430    type Stream = HttpStream;
431
432    fn connect(&self, host: &str, port: u16, scheme: &str) -> ::Result<HttpStream> {
433        Ok(HttpStream(try!((*self)(host, port, scheme))))
434    }
435}
436
437/// An abstraction to allow any SSL implementation to be used with client-side HttpsStreams.
438pub trait SslClient<T: NetworkStream + Send + Clone = HttpStream> {
439    /// The protected stream.
440    type Stream: NetworkStream + Send + Clone;
441    /// Wrap a client stream with SSL.
442    fn wrap_client(&self, stream: T, host: &str) -> ::Result<Self::Stream>;
443}
444
445/// An abstraction to allow any SSL implementation to be used with server-side HttpsStreams.
446pub trait SslServer<T: NetworkStream + Send + Clone = HttpStream> {
447    /// The protected stream.
448    type Stream: NetworkStream + Send + Clone;
449    /// Wrap a server stream with SSL.
450    fn wrap_server(&self, stream: T) -> ::Result<Self::Stream>;
451}
452
453/// A stream over the HTTP protocol, possibly protected by SSL.
454#[derive(Debug, Clone)]
455pub enum HttpsStream<S: NetworkStream> {
456    /// A plain text stream.
457    Http(HttpStream),
458    /// A stream protected by SSL.
459    Https(S)
460}
461
462impl<S: NetworkStream> Read for HttpsStream<S> {
463    #[inline]
464    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
465        match *self {
466            HttpsStream::Http(ref mut s) => s.read(buf),
467            HttpsStream::Https(ref mut s) => s.read(buf)
468        }
469    }
470}
471
472impl<S: NetworkStream> Write for HttpsStream<S> {
473    #[inline]
474    fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
475        match *self {
476            HttpsStream::Http(ref mut s) => s.write(msg),
477            HttpsStream::Https(ref mut s) => s.write(msg)
478        }
479    }
480
481    #[inline]
482    fn flush(&mut self) -> io::Result<()> {
483        match *self {
484            HttpsStream::Http(ref mut s) => s.flush(),
485            HttpsStream::Https(ref mut s) => s.flush()
486        }
487    }
488}
489
490impl<S: NetworkStream> NetworkStream for HttpsStream<S> {
491    #[inline]
492    fn peer_addr(&mut self) -> io::Result<SocketAddr> {
493        match *self {
494            HttpsStream::Http(ref mut s) => s.peer_addr(),
495            HttpsStream::Https(ref mut s) => s.peer_addr()
496        }
497    }
498
499    #[inline]
500    fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
501        match *self {
502            HttpsStream::Http(ref inner) => inner.0.set_read_timeout(dur),
503            HttpsStream::Https(ref inner) => inner.set_read_timeout(dur)
504        }
505    }
506
507    #[inline]
508    fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
509        match *self {
510            HttpsStream::Http(ref inner) => inner.0.set_write_timeout(dur),
511            HttpsStream::Https(ref inner) => inner.set_write_timeout(dur)
512        }
513    }
514
515    #[inline]
516    fn close(&mut self, how: Shutdown) -> io::Result<()> {
517        match *self {
518            HttpsStream::Http(ref mut s) => s.close(how),
519            HttpsStream::Https(ref mut s) => s.close(how)
520        }
521    }
522}
523
524/// A Http Listener over SSL.
525#[derive(Clone)]
526pub struct HttpsListener<S: SslServer> {
527    listener: HttpListener,
528    ssl: S,
529}
530
531impl<S: SslServer> HttpsListener<S> {
532    /// Start listening to an address over HTTPS.
533    pub fn new<To: ToSocketAddrs>(addr: To, ssl: S) -> ::Result<HttpsListener<S>> {
534        HttpListener::new(addr).map(|l| HttpsListener {
535            listener: l,
536            ssl: ssl
537        })
538    }
539
540    /// Construct an HttpsListener from a bound `TcpListener`.
541    pub fn with_listener(listener: HttpListener, ssl: S) -> HttpsListener<S> {
542        HttpsListener {
543            listener: listener,
544            ssl: ssl
545        }
546    }
547}
548
549impl<S: SslServer + Clone> NetworkListener for HttpsListener<S> {
550    type Stream = S::Stream;
551
552    #[inline]
553    fn accept(&mut self) -> ::Result<S::Stream> {
554        self.listener.accept().and_then(|s| self.ssl.wrap_server(s))
555    }
556
557    #[inline]
558    fn local_addr(&mut self) -> io::Result<SocketAddr> {
559        self.listener.local_addr()
560    }
561
562    fn set_read_timeout(&mut self, duration: Option<Duration>) {
563        self.listener.set_read_timeout(duration)
564    }
565
566    fn set_write_timeout(&mut self, duration: Option<Duration>) {
567        self.listener.set_write_timeout(duration)
568    }
569}
570
571/// A connector that can protect HTTP streams using SSL.
572#[derive(Debug, Default)]
573pub struct HttpsConnector<S: SslClient, C: NetworkConnector = HttpConnector> {
574    ssl: S,
575    connector: C,
576}
577
578impl<S: SslClient> HttpsConnector<S, HttpConnector> {
579    /// Create a new connector using the provided SSL implementation.
580    pub fn new(s: S) -> HttpsConnector<S, HttpConnector> {
581        HttpsConnector::with_connector(s, HttpConnector)
582    }
583}
584
585impl<S: SslClient, C: NetworkConnector> HttpsConnector<S, C> {
586    /// Create a new connector using the provided SSL implementation.
587    pub fn with_connector(s: S, connector: C) -> HttpsConnector<S, C> {
588        HttpsConnector { ssl: s, connector: connector }
589    }
590}
591
592impl<S: SslClient, C: NetworkConnector<Stream=HttpStream>> NetworkConnector for HttpsConnector<S, C> {
593    type Stream = HttpsStream<S::Stream>;
594
595    fn connect(&self, host: &str, port: u16, scheme: &str) -> ::Result<Self::Stream> {
596        let stream = try!(self.connector.connect(host, port, "http"));
597        if scheme == "https" {
598            debug!("https scheme");
599            self.ssl.wrap_client(stream, host).map(HttpsStream::Https)
600        } else {
601            Ok(HttpsStream::Http(stream))
602        }
603    }
604}
605
606
607#[doc(hidden)]
608pub type DefaultConnector = HttpConnector;
609
610#[cfg(test)]
611mod tests {
612    use mock::MockStream;
613    use super::{NetworkStream};
614
615    #[test]
616    fn test_downcast_box_stream() {
617        // FIXME: Use Type ascription
618        let stream: Box<NetworkStream + Send> = Box::new(MockStream::new());
619
620        let mock = stream.downcast::<MockStream>().ok().unwrap();
621        assert_eq!(mock, Box::new(MockStream::new()));
622    }
623
624    #[test]
625    fn test_downcast_unchecked_box_stream() {
626        // FIXME: Use Type ascription
627        let stream: Box<NetworkStream + Send> = Box::new(MockStream::new());
628
629        let mock = unsafe { stream.downcast_unchecked::<MockStream>() };
630        assert_eq!(mock, Box::new(MockStream::new()));
631    }
632}
633