cogo_http/
net.rs

1//! A collection of traits abstracting over Listeners and Streams.
2use std::any::{Any, TypeId};
3use std::fmt;
4use std::io::{self, ErrorKind, Read, Write};
5use std::net::{SocketAddr, ToSocketAddrs, Shutdown};
6use std::mem;
7use std::sync::Arc;
8
9use std::time::Duration;
10use crate::runtime::TcpListener;
11
12use typeable::Typeable;
13use traitobject;
14use crate::runtime::TcpStream;
15
16/// The write-status indicating headers have not been written.
17pub enum Fresh {}
18
19/// The write-status indicating headers have been written.
20pub enum Streaming {}
21
22/// An abstraction to listen for connections on a certain port.
23pub trait NetworkListener: Clone {
24    /// The stream produced for each connection.
25    type Stream: NetworkStream + Send + Clone;
26
27    /// Returns an iterator of streams.
28    fn accept(&mut self) -> crate::Result<Self::Stream>;
29
30    /// Get the address this Listener ended up listening on.
31    fn local_addr(&mut self) -> io::Result<SocketAddr>;
32
33    /// Returns an iterator over incoming connections.
34    fn incoming(&mut self) -> NetworkConnections<Self> {
35        NetworkConnections(self)
36    }
37
38    /// Sets the read timeout for all streams that are accepted
39    fn set_read_timeout(&mut self, _: Option<Duration>) {
40        // This default implementation is only here to prevent the addition of
41        // these methods from being a breaking change. They should be removed
42        // when the next breaking release is made.
43        warn!("Ignoring read timeout");
44    }
45
46    /// Sets the write timeout for all streams that are accepted
47    fn set_write_timeout(&mut self, _: Option<Duration>) {
48        // This default implementation is only here to prevent the addition of
49        // these methods from being a breaking change. They should be removed
50        // when the next breaking release is made.
51        warn!("Ignoring write timeout");
52    }
53}
54
55/// An iterator wrapper over a `NetworkAcceptor`.
56pub struct NetworkConnections<'a, N: NetworkListener + 'a>(&'a mut N);
57
58impl<'a, N: NetworkListener + 'a> Iterator for NetworkConnections<'a, N> {
59    type Item = crate::Result<N::Stream>;
60    fn next(&mut self) -> Option<crate::Result<N::Stream>> {
61        Some(self.0.accept())
62    }
63}
64
65/// An abstraction over streams that a `Server` can utilize.
66pub trait NetworkStream: Read + Write + Any + Send + Typeable {
67    /// Get the remote address of the underlying connection.
68    fn peer_addr(&mut self) -> io::Result<SocketAddr>;
69
70    /// Set the maximum time to wait for a read to complete.
71    fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()>;
72
73    /// Set the maximum time to wait for a write to complete.
74    fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()>;
75
76    /// This will be called when Stream should no longer be kept alive.
77    #[inline]
78    fn close(&mut self, _how: Shutdown) -> io::Result<()> {
79        Ok(())
80    }
81
82    // Unsure about name and implementation...
83
84    #[doc(hidden)]
85    fn set_previous_response_expected_no_content(&mut self, _expected: bool) {}
86
87    #[doc(hidden)]
88    fn previous_response_expected_no_content(&self) -> bool {
89        false
90    }
91
92    fn set_nonblocking(&self, b: bool);
93
94    fn reset_io(&self);
95
96    fn wait_io(&self);
97}
98
99/// A connector creates a NetworkStream.
100pub trait NetworkConnector {
101    /// Type of `Stream` to create
102    type Stream: Into<Box<dyn NetworkStream + Send>>;
103
104    /// Connect to a remote address.
105    fn connect(&self, host: &str, port: u16, scheme: &str) -> crate::Result<Self::Stream>;
106}
107
108impl<T: NetworkStream + Send> From<T> for Box<dyn NetworkStream + Send> {
109    fn from(s: T) -> Box<dyn NetworkStream + Send> {
110        Box::new(s)
111    }
112}
113
114impl fmt::Debug for Box<dyn NetworkStream + Send> {
115    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
116        fmt.pad("Box<NetworkStream>")
117    }
118}
119
120impl dyn NetworkStream {
121    unsafe fn downcast_ref_unchecked<T: 'static>(&self) -> &T {
122        mem::transmute(traitobject::data(self))
123    }
124
125    unsafe fn downcast_mut_unchecked<T: 'static>(&mut self) -> &mut T {
126        mem::transmute(traitobject::data_mut(self))
127    }
128
129    unsafe fn downcast_unchecked<T: 'static>(self: Box<dyn NetworkStream>) -> Box<T> {
130        let raw: *mut dyn NetworkStream = mem::transmute(self);
131        mem::transmute(traitobject::data_mut(raw))
132    }
133}
134
135impl dyn NetworkStream {
136    /// Is the underlying type in this trait object a `T`?
137    #[inline]
138    pub fn is<T: Any>(&self) -> bool {
139        (*self).get_type() == TypeId::of::<T>()
140    }
141
142    /// If the underlying type is `T`, get a reference to the contained data.
143    #[inline]
144    pub fn downcast_ref<T: Any>(&self) -> Option<&T> {
145        if self.is::<T>() {
146            Some(unsafe { self.downcast_ref_unchecked() })
147        } else {
148            None
149        }
150    }
151
152    /// If the underlying type is `T`, get a mutable reference to the contained
153    /// data.
154    #[inline]
155    pub fn downcast_mut<T: Any>(&mut self) -> Option<&mut T> {
156        if self.is::<T>() {
157            Some(unsafe { self.downcast_mut_unchecked() })
158        } else {
159            None
160        }
161    }
162
163    /// If the underlying type is `T`, extract it.
164    #[inline]
165    pub fn downcast<T: Any>(self: Box<dyn NetworkStream>)
166                            -> Result<Box<T>, Box<dyn NetworkStream>> {
167        if self.is::<T>() {
168            Ok(unsafe { self.downcast_unchecked() })
169        } else {
170            Err(self)
171        }
172    }
173}
174
175impl dyn NetworkStream + Send {
176    unsafe fn downcast_ref_unchecked<T: 'static>(&self) -> &T {
177        mem::transmute(traitobject::data(self))
178    }
179
180    unsafe fn downcast_mut_unchecked<T: 'static>(&mut self) -> &mut T {
181        mem::transmute(traitobject::data_mut(self))
182    }
183
184    unsafe fn downcast_unchecked<T: 'static>(self: Box<dyn NetworkStream + Send>) -> Box<T> {
185        let raw: *mut dyn NetworkStream = mem::transmute(self);
186        mem::transmute(traitobject::data_mut(raw))
187    }
188}
189
190impl dyn NetworkStream + Send {
191    /// Is the underlying type in this trait object a `T`?
192    #[inline]
193    pub fn is<T: Any>(&self) -> bool {
194        (*self).get_type() == TypeId::of::<T>()
195    }
196
197    /// If the underlying type is `T`, get a reference to the contained data.
198    #[inline]
199    pub fn downcast_ref<T: Any>(&self) -> Option<&T> {
200        if self.is::<T>() {
201            Some(unsafe { self.downcast_ref_unchecked() })
202        } else {
203            None
204        }
205    }
206
207    /// If the underlying type is `T`, get a mutable reference to the contained
208    /// data.
209    #[inline]
210    pub fn downcast_mut<T: Any>(&mut self) -> Option<&mut T> {
211        if self.is::<T>() {
212            Some(unsafe { self.downcast_mut_unchecked() })
213        } else {
214            None
215        }
216    }
217
218    /// If the underlying type is `T`, extract it.
219    #[inline]
220    pub fn downcast<T: Any>(self: Box<dyn NetworkStream + Send>)
221                            -> Result<Box<T>, Box<dyn NetworkStream + Send>> {
222        if self.is::<T>() {
223            Ok(unsafe { self.downcast_unchecked() })
224        } else {
225            Err(self)
226        }
227    }
228}
229
230/// A `NetworkListener` for `HttpStream`s.
231#[derive(Clone)]
232pub struct HttpListener {
233    listener: Arc<TcpListener>,
234
235    read_timeout: Option<Duration>,
236    write_timeout: Option<Duration>,
237}
238
239impl From<TcpListener> for HttpListener {
240    fn from(listener: TcpListener) -> HttpListener {
241        HttpListener {
242            listener: Arc::new(listener),
243
244            read_timeout: None,
245            write_timeout: None,
246        }
247    }
248}
249
250impl HttpListener {
251    /// Start listening to an address over HTTP.
252    pub fn new<To: ToSocketAddrs>(addr: To) -> crate::Result<HttpListener> {
253        Ok(HttpListener::from(r#try!(TcpListener::bind(addr))))
254    }
255}
256
257impl NetworkListener for HttpListener {
258    type Stream = HttpStream;
259
260    #[inline]
261    fn accept(&mut self) -> crate::Result<HttpStream> {
262        let stream = HttpStream(r#try!(self.listener.accept()).0);
263        r#try!(stream.set_read_timeout(self.read_timeout));
264        r#try!(stream.set_write_timeout(self.write_timeout));
265        Ok(stream)
266    }
267
268    #[inline]
269    fn local_addr(&mut self) -> io::Result<SocketAddr> {
270        self.listener.local_addr()
271    }
272
273    fn set_read_timeout(&mut self, duration: Option<Duration>) {
274        self.read_timeout = duration;
275    }
276
277    fn set_write_timeout(&mut self, duration: Option<Duration>) {
278        self.write_timeout = duration;
279    }
280}
281
282#[cfg(windows)]
283impl ::std::os::windows::io::AsRawSocket for HttpListener {
284    fn as_raw_socket(&self) -> ::std::os::windows::io::RawSocket {
285        self.listener.as_raw_socket()
286    }
287}
288
289#[cfg(windows)]
290impl ::std::os::windows::io::FromRawSocket for HttpListener {
291    unsafe fn from_raw_socket(sock: ::std::os::windows::io::RawSocket) -> HttpListener {
292        HttpListener::from(TcpListener::from_raw_socket(sock))
293    }
294}
295
296#[cfg(unix)]
297impl ::std::os::unix::io::AsRawFd for HttpListener {
298    fn as_raw_fd(&self) -> ::std::os::unix::io::RawFd {
299        self.listener.as_raw_fd()
300    }
301}
302
303#[cfg(unix)]
304impl ::std::os::unix::io::FromRawFd for HttpListener {
305    unsafe fn from_raw_fd(fd: ::std::os::unix::io::RawFd) -> HttpListener {
306        HttpListener::from(TcpListener::from_raw_fd(fd))
307    }
308}
309
310/// A wrapper around a `TcpStream`.
311pub struct HttpStream(pub TcpStream);
312
313impl Clone for HttpStream {
314    #[inline]
315    fn clone(&self) -> HttpStream {
316        HttpStream(self.0.try_clone().unwrap())
317    }
318}
319
320impl fmt::Debug for HttpStream {
321    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
322        f.write_str("HttpStream(_)")
323    }
324}
325
326impl Read for HttpStream {
327    #[inline]
328    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
329        self.0.read(buf)
330    }
331}
332
333impl Write for HttpStream {
334    #[inline]
335    fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
336        self.0.write(msg)
337    }
338    #[inline]
339    fn flush(&mut self) -> io::Result<()> {
340        self.0.flush()
341    }
342}
343
344#[cfg(windows)]
345impl ::std::os::windows::io::AsRawSocket for HttpStream {
346    fn as_raw_socket(&self) -> ::std::os::windows::io::RawSocket {
347        self.0.as_raw_socket()
348    }
349}
350
351#[cfg(windows)]
352impl ::std::os::windows::io::FromRawSocket for HttpStream {
353    unsafe fn from_raw_socket(sock: ::std::os::windows::io::RawSocket) -> HttpStream {
354        HttpStream(TcpStream::from_raw_socket(sock))
355    }
356}
357
358#[cfg(unix)]
359impl ::std::os::unix::io::AsRawFd for HttpStream {
360    fn as_raw_fd(&self) -> ::std::os::unix::io::RawFd {
361        self.0.as_raw_fd()
362    }
363}
364
365#[cfg(unix)]
366impl ::std::os::unix::io::FromRawFd for HttpStream {
367    unsafe fn from_raw_fd(fd: ::std::os::unix::io::RawFd) -> HttpStream {
368        HttpStream(TcpStream::from_raw_fd(fd))
369    }
370}
371
372impl NetworkStream for HttpStream {
373    #[inline]
374    fn peer_addr(&mut self) -> io::Result<SocketAddr> {
375        self.0.peer_addr()
376    }
377
378    #[inline]
379    fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
380        self.0.set_read_timeout(dur)
381    }
382
383    #[inline]
384    fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
385        self.0.set_write_timeout(dur)
386    }
387
388    #[inline]
389    fn close(&mut self, how: Shutdown) -> io::Result<()> {
390        match self.0.shutdown(how) {
391            Ok(_) => Ok(()),
392            // see https://github.com/hyperium/hyper/issues/508
393            Err(ref e) if e.kind() == ErrorKind::NotConnected => Ok(()),
394            err => err
395        }
396    }
397
398    fn set_nonblocking(&self, b: bool) {
399        #[cfg(unix)]
400        self.0.set_nonblocking(b);
401    }
402
403    fn reset_io(&self) {
404        #[cfg(unix)]
405        use cogo::io::WaitIo;
406        #[cfg(unix)]
407        self.0.reset_io();
408    }
409
410    fn wait_io(&self) {
411        #[cfg(unix)]
412        use cogo::io::WaitIo;
413        #[cfg(unix)]
414        self.0.wait_io();
415    }
416}
417
418/// A connector that will produce HttpStreams.
419#[derive(Debug, Clone, Default)]
420pub struct HttpConnector;
421
422impl NetworkConnector for HttpConnector {
423    type Stream = HttpStream;
424
425    fn connect(&self, host: &str, port: u16, scheme: &str) -> crate::Result<HttpStream> {
426        let addr = &(host, port);
427        Ok(r#try!(match scheme {
428            "http" => {
429                debug!("http scheme");
430                Ok(HttpStream(r#try!(TcpStream::connect(addr))))
431            },
432            _ => {
433                Err(io::Error::new(io::ErrorKind::InvalidInput,
434                                "Invalid scheme for Http"))
435            }
436        }))
437    }
438}
439
440/// A closure as a connector used to generate `TcpStream`s per request
441///
442/// # Example
443///
444/// Basic example:
445///
446/// ```norun
447/// Client::with_connector(|addr: &str, port: u16, scheme: &str| {
448///     TcpStream::connect(&(addr, port))
449/// });
450/// ```
451///
452/// Example using `TcpBuilder` from the net2 crate if you want to configure your source socket:
453///
454/// ```norun
455/// Client::with_connector(|addr: &str, port: u16, scheme: &str| {
456///     let b = r#try!(TcpBuilder::new_v4());
457///     r#try!(b.bind("127.0.0.1:0"));
458///     b.connect(&(addr, port))
459/// });
460/// ```
461impl<F> NetworkConnector for F where F: Fn(&str, u16, &str) -> io::Result<TcpStream> {
462    type Stream = HttpStream;
463
464    fn connect(&self, host: &str, port: u16, scheme: &str) -> crate::Result<HttpStream> {
465        Ok(HttpStream(r#try!((*self)(host, port, scheme))))
466    }
467}
468
469/// An abstraction to allow any SSL implementation to be used with client-side HttpsStreams.
470pub trait SslClient<T: NetworkStream + Send + Clone = HttpStream> {
471    /// The protected stream.
472    type Stream: NetworkStream + Send + Clone;
473    /// Wrap a client stream with SSL.
474    fn wrap_client(&self, stream: T, host: &str) -> crate::Result<Self::Stream>;
475}
476
477/// An abstraction to allow any SSL implementation to be used with server-side HttpsStreams.
478pub trait SslServer<T: NetworkStream + Send + Clone = HttpStream> {
479    /// The protected stream.
480    type Stream: NetworkStream + Send + Clone;
481    /// Wrap a server stream with SSL.
482    fn wrap_server(&self, stream: T) -> crate::Result<Self::Stream>;
483}
484
485/// A stream over the HTTP protocol, possibly protected by SSL.
486#[derive(Debug, Clone)]
487pub enum HttpsStream<S: NetworkStream> {
488    /// A plain text stream.
489    Http(HttpStream),
490    /// A stream protected by SSL.
491    Https(S),
492}
493
494impl<S: NetworkStream> Read for HttpsStream<S> {
495    #[inline]
496    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
497        match *self {
498            HttpsStream::Http(ref mut s) => s.read(buf),
499            HttpsStream::Https(ref mut s) => s.read(buf)
500        }
501    }
502}
503
504impl<S: NetworkStream> Write for HttpsStream<S> {
505    #[inline]
506    fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
507        match *self {
508            HttpsStream::Http(ref mut s) => s.write(msg),
509            HttpsStream::Https(ref mut s) => s.write(msg)
510        }
511    }
512
513    #[inline]
514    fn flush(&mut self) -> io::Result<()> {
515        match *self {
516            HttpsStream::Http(ref mut s) => s.flush(),
517            HttpsStream::Https(ref mut s) => s.flush()
518        }
519    }
520}
521
522impl<S: NetworkStream> NetworkStream for HttpsStream<S> {
523    #[inline]
524    fn peer_addr(&mut self) -> io::Result<SocketAddr> {
525        match *self {
526            HttpsStream::Http(ref mut s) => s.peer_addr(),
527            HttpsStream::Https(ref mut s) => s.peer_addr()
528        }
529    }
530
531    #[inline]
532    fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
533        match *self {
534            HttpsStream::Http(ref inner) => inner.0.set_read_timeout(dur),
535            HttpsStream::Https(ref inner) => inner.set_read_timeout(dur)
536        }
537    }
538
539    #[inline]
540    fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
541        match *self {
542            HttpsStream::Http(ref inner) => inner.0.set_write_timeout(dur),
543            HttpsStream::Https(ref inner) => inner.set_write_timeout(dur)
544        }
545    }
546
547    #[inline]
548    fn close(&mut self, how: Shutdown) -> io::Result<()> {
549        match *self {
550            HttpsStream::Http(ref mut s) => s.close(how),
551            HttpsStream::Https(ref mut s) => s.close(how)
552        }
553    }
554    #[inline]
555    fn set_nonblocking(&self, b: bool) {
556        match *self {
557            HttpsStream::Http(ref s) => s.set_nonblocking(b),
558            HttpsStream::Https(ref s) => s.set_nonblocking(b)
559        }
560    }
561    #[inline]
562    fn reset_io(&self) {
563        match *self {
564            HttpsStream::Http(ref s) => s.reset_io(),
565            HttpsStream::Https(ref s) => s.reset_io()
566        }
567    }
568    #[inline]
569    fn wait_io(&self) {
570        match *self {
571            HttpsStream::Http(ref s) => s.wait_io(),
572            HttpsStream::Https(ref s) => s.wait_io()
573        }
574    }
575}
576
577/// A Http Listener over SSL.
578#[derive(Clone)]
579pub struct HttpsListener<S: SslServer> {
580    listener: HttpListener,
581    ssl: S,
582}
583
584impl<S: SslServer> HttpsListener<S> {
585    /// Start listening to an address over HTTPS.
586    pub fn new<To: ToSocketAddrs>(addr: To, ssl: S) -> crate::Result<HttpsListener<S>> {
587        HttpListener::new(addr).map(|l| HttpsListener {
588            listener: l,
589            ssl: ssl,
590        })
591    }
592
593    /// Construct an HttpsListener from a bound `TcpListener`.
594    pub fn with_listener(listener: HttpListener, ssl: S) -> HttpsListener<S> {
595        HttpsListener {
596            listener: listener,
597            ssl: ssl,
598        }
599    }
600}
601
602impl<S: SslServer + Clone> NetworkListener for HttpsListener<S> {
603    type Stream = S::Stream;
604
605    #[inline]
606    fn accept(&mut self) -> crate::Result<S::Stream> {
607        self.listener.accept().and_then(|s| self.ssl.wrap_server(s))
608    }
609
610    #[inline]
611    fn local_addr(&mut self) -> io::Result<SocketAddr> {
612        self.listener.local_addr()
613    }
614
615    fn set_read_timeout(&mut self, duration: Option<Duration>) {
616        self.listener.set_read_timeout(duration)
617    }
618
619    fn set_write_timeout(&mut self, duration: Option<Duration>) {
620        self.listener.set_write_timeout(duration)
621    }
622}
623
624/// A connector that can protect HTTP streams using SSL.
625#[derive(Debug, Default)]
626pub struct HttpsConnector<S: SslClient, C: NetworkConnector = HttpConnector> {
627    ssl: S,
628    connector: C,
629}
630
631impl<S: SslClient> HttpsConnector<S, HttpConnector> {
632    /// Create a new connector using the provided SSL implementation.
633    pub fn new(s: S) -> HttpsConnector<S, HttpConnector> {
634        HttpsConnector::with_connector(s, HttpConnector)
635    }
636}
637
638impl<S: SslClient, C: NetworkConnector> HttpsConnector<S, C> {
639    /// Create a new connector using the provided SSL implementation.
640    pub fn with_connector(s: S, connector: C) -> HttpsConnector<S, C> {
641        HttpsConnector { ssl: s, connector: connector }
642    }
643}
644
645impl<S: SslClient, C: NetworkConnector<Stream=HttpStream>> NetworkConnector for HttpsConnector<S, C> {
646    type Stream = HttpsStream<S::Stream>;
647
648    fn connect(&self, host: &str, port: u16, scheme: &str) -> crate::Result<Self::Stream> {
649        let stream = r#try!(self.connector.connect(host, port, "http"));
650        if scheme == "https" {
651            debug!("https scheme");
652            self.ssl.wrap_client(stream, host).map(HttpsStream::Https)
653        } else {
654            Ok(HttpsStream::Http(stream))
655        }
656    }
657}
658
659
660#[doc(hidden)]
661pub type DefaultConnector = HttpConnector;
662
663#[cfg(test)]
664mod tests {
665    use crate::mock::MockStream;
666    use super::{NetworkStream};
667
668    #[test]
669    fn test_downcast_box_stream() {
670        // FIXME: Use Type ascription
671        let stream: Box<dyn NetworkStream + Send> = Box::new(MockStream::new());
672
673        let mock = stream.downcast::<MockStream>().ok().unwrap();
674        assert_eq!(mock, Box::new(MockStream::new()));
675    }
676
677    #[test]
678    fn test_downcast_unchecked_box_stream() {
679        // FIXME: Use Type ascription
680        let stream: Box<dyn NetworkStream + Send> = Box::new(MockStream::new());
681
682        let mock = unsafe { stream.downcast_unchecked::<MockStream>() };
683        assert_eq!(mock, Box::new(MockStream::new()));
684    }
685}
686