mco_http/net/
mod.rs

1//! A collection of traits abstracting over Listeners and Streams.
2use std::any::{Any, TypeId};
3use std::fmt;
4use std::io::{self, ErrorKind, Read, Write};
5use std::net::{SocketAddr, ToSocketAddrs, TcpStream, TcpListener, Shutdown};
6use std::mem;
7use std::sync::Arc;
8
9use std::time::Duration;
10
11use typeable::Typeable;
12use traitobject;
13
14/// The write-status indicating headers have not been written.
15pub enum Fresh {}
16
17/// The write-status indicating headers have been written.
18pub enum Streaming {}
19
20/// An abstraction to listen for connections on a certain port.
21pub trait NetworkListener: Clone {
22    /// The stream produced for each connection.
23    type Stream: NetworkStream + Send + Clone;
24
25    /// Returns an iterator of streams.
26    fn accept(&mut self) -> crate::Result<Self::Stream>;
27
28    /// Get the address this Listener ended up listening on.
29    fn local_addr(&mut self) -> io::Result<SocketAddr>;
30
31    /// Returns an iterator over incoming connections.
32    fn incoming(&mut self) -> NetworkConnections<Self> {
33        NetworkConnections(self)
34    }
35
36    /// Sets the read timeout for all streams that are accepted
37    fn set_read_timeout(&mut self, _: Option<Duration>) {
38        // This default implementation is only here to prevent the addition of
39        // these methods from being a breaking change. They should be removed
40        // when the next breaking release is made.
41        warn!("Ignoring read timeout");
42    }
43
44    /// Sets the write timeout for all streams that are accepted
45    fn set_write_timeout(&mut self, _: Option<Duration>) {
46        // This default implementation is only here to prevent the addition of
47        // these methods from being a breaking change. They should be removed
48        // when the next breaking release is made.
49        warn!("Ignoring write timeout");
50    }
51}
52
53/// An iterator wrapper over a `NetworkAcceptor`.
54pub struct NetworkConnections<'a, N: NetworkListener + 'a>(&'a mut N);
55
56impl<'a, N: NetworkListener + 'a> Iterator for NetworkConnections<'a, N> {
57    type Item = crate::Result<N::Stream>;
58    fn next(&mut self) -> Option<crate::Result<N::Stream>> {
59        Some(self.0.accept())
60    }
61}
62
63/// An abstraction over streams that a `Server` can utilize.
64pub trait NetworkStream: Read + Write + Any + Send + Typeable {
65    /// Get the remote address of the underlying connection.
66    fn peer_addr(&mut self) -> io::Result<SocketAddr>;
67
68    /// Set the maximum time to wait for a read to complete.
69    fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()>;
70
71    /// Set the maximum time to wait for a write to complete.
72    fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()>;
73
74    /// This will be called when Stream should no longer be kept alive.
75    #[inline]
76    fn close(&mut self, _how: Shutdown) -> io::Result<()> {
77        Ok(())
78    }
79
80    // Unsure about name and implementation...
81
82    #[doc(hidden)]
83    fn set_previous_response_expected_no_content(&mut self, _expected: bool) { }
84
85    #[doc(hidden)]
86    fn previous_response_expected_no_content(&self) -> bool {
87        false
88    }
89}
90
91/// A connector creates a NetworkStream.
92pub trait NetworkConnector {
93    /// Type of `Stream` to create
94    type Stream: Into<Box<dyn NetworkStream + Send>>;
95
96    /// Connect to a remote address.
97    fn connect(&self, host: &str, port: u16, scheme: &str) -> crate::Result<Self::Stream>;
98}
99
100impl<T: NetworkStream + Send> From<T> for Box<dyn NetworkStream + Send> {
101    fn from(s: T) -> Box<dyn NetworkStream + Send> {
102        Box::new(s)
103    }
104}
105
106impl fmt::Debug for Box<dyn NetworkStream + Send> {
107    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
108        fmt.pad("Box<NetworkStream>")
109    }
110}
111
112impl dyn NetworkStream {
113    unsafe fn downcast_ref_unchecked<T: 'static>(&self) -> &T {
114        mem::transmute(traitobject::data(self))
115    }
116
117    unsafe fn downcast_mut_unchecked<T: 'static>(&mut self) -> &mut T {
118        mem::transmute(traitobject::data_mut(self))
119    }
120
121    unsafe fn downcast_unchecked<T: 'static>(self: Box<dyn NetworkStream>) -> Box<T>  {
122        let raw: *mut dyn NetworkStream = mem::transmute(self);
123        mem::transmute(traitobject::data_mut(raw))
124    }
125}
126
127impl dyn NetworkStream {
128    /// Is the underlying type in this trait object a `T`?
129    #[inline]
130    pub fn is<T: Any>(&self) -> bool {
131        (*self).get_type() == TypeId::of::<T>()
132    }
133
134    /// If the underlying type is `T`, get a reference to the contained data.
135    #[inline]
136    pub fn downcast_ref<T: Any>(&self) -> Option<&T> {
137        if self.is::<T>() {
138            Some(unsafe { self.downcast_ref_unchecked() })
139        } else {
140            None
141        }
142    }
143
144    /// If the underlying type is `T`, get a mutable reference to the contained
145    /// data.
146    #[inline]
147    pub fn downcast_mut<T: Any>(&mut self) -> Option<&mut T> {
148        if self.is::<T>() {
149            Some(unsafe { self.downcast_mut_unchecked() })
150        } else {
151            None
152        }
153    }
154
155    /// If the underlying type is `T`, extract it.
156    #[inline]
157    pub fn downcast<T: Any>(self: Box<dyn NetworkStream>)
158                            -> Result<Box<T>, Box<dyn NetworkStream>> {
159        if self.is::<T>() {
160            Ok(unsafe { self.downcast_unchecked() })
161        } else {
162            Err(self)
163        }
164    }
165}
166
167impl dyn NetworkStream + Send {
168    unsafe fn downcast_ref_unchecked<T: 'static>(&self) -> &T {
169        mem::transmute(traitobject::data(self))
170    }
171
172    unsafe fn downcast_mut_unchecked<T: 'static>(&mut self) -> &mut T {
173        mem::transmute(traitobject::data_mut(self))
174    }
175
176    unsafe fn downcast_unchecked<T: 'static>(self: Box<dyn NetworkStream + Send>) -> Box<T>  {
177        let raw: *mut dyn NetworkStream = mem::transmute(self);
178        mem::transmute(traitobject::data_mut(raw))
179    }
180}
181
182impl dyn NetworkStream + Send {
183    /// Is the underlying type in this trait object a `T`?
184    #[inline]
185    pub fn is<T: Any>(&self) -> bool {
186        (*self).get_type() == TypeId::of::<T>()
187    }
188
189    /// If the underlying type is `T`, get a reference to the contained data.
190    #[inline]
191    pub fn downcast_ref<T: Any>(&self) -> Option<&T> {
192        if self.is::<T>() {
193            Some(unsafe { self.downcast_ref_unchecked() })
194        } else {
195            None
196        }
197    }
198
199    /// If the underlying type is `T`, get a mutable reference to the contained
200    /// data.
201    #[inline]
202    pub fn downcast_mut<T: Any>(&mut self) -> Option<&mut T> {
203        if self.is::<T>() {
204            Some(unsafe { self.downcast_mut_unchecked() })
205        } else {
206            None
207        }
208    }
209
210    /// If the underlying type is `T`, extract it.
211    #[inline]
212    pub fn downcast<T: Any>(self: Box<dyn NetworkStream + Send>)
213                            -> Result<Box<T>, Box<dyn NetworkStream + Send>> {
214        if self.is::<T>() {
215            Ok(unsafe { self.downcast_unchecked() })
216        } else {
217            Err(self)
218        }
219    }
220}
221
222/// A `NetworkListener` for `HttpStream`s.
223#[derive(Clone)]
224pub struct HttpListener {
225    listener: Arc<TcpListener>,
226
227    read_timeout : Option<Duration>,
228    write_timeout: Option<Duration>,
229}
230
231impl From<TcpListener> for HttpListener {
232    fn from(listener: TcpListener) -> HttpListener {
233        HttpListener {
234            listener: Arc::new(listener),
235
236            read_timeout : None,
237            write_timeout: None,
238        }
239    }
240}
241
242impl HttpListener {
243    /// Start listening to an address over HTTP.
244    pub fn new<To: ToSocketAddrs>(addr: To) -> crate::Result<HttpListener> {
245        Ok(HttpListener::from(TcpListener::bind(addr)?))
246    }
247}
248
249impl NetworkListener for HttpListener {
250    type Stream = HttpStream;
251
252    #[inline]
253    fn accept(&mut self) -> crate::Result<HttpStream> {
254        let stream = HttpStream(self.listener.accept()?.0);
255        stream.set_read_timeout(self.read_timeout)?;
256        stream.set_write_timeout(self.write_timeout)?;
257        Ok(stream)
258    }
259
260    #[inline]
261    fn local_addr(&mut self) -> io::Result<SocketAddr> {
262        self.listener.local_addr()
263    }
264
265    fn set_read_timeout(&mut self, duration: Option<Duration>) {
266        self.read_timeout = duration;
267    }
268
269    fn set_write_timeout(&mut self, duration: Option<Duration>) {
270        self.write_timeout = duration;
271    }
272}
273
274#[cfg(windows)]
275impl ::std::os::windows::io::AsRawSocket for HttpListener {
276    fn as_raw_socket(&self) -> std::os::windows::io::RawSocket {
277        self.listener.as_raw_socket()
278    }
279}
280
281#[cfg(windows)]
282impl ::std::os::windows::io::FromRawSocket for HttpListener {
283    unsafe fn from_raw_socket(sock: ::std::os::windows::io::RawSocket) -> HttpListener {
284        HttpListener::from(TcpListener::from_raw_socket(sock))
285    }
286}
287
288#[cfg(unix)]
289impl ::std::os::unix::io::AsRawFd for HttpListener {
290    fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
291        self.listener.as_raw_fd()
292    }
293}
294
295#[cfg(unix)]
296impl ::std::os::unix::io::FromRawFd for HttpListener {
297    unsafe fn from_raw_fd(fd: ::std::os::unix::io::RawFd) -> HttpListener {
298        HttpListener::from(TcpListener::from_raw_fd(fd))
299    }
300}
301
302/// A wrapper around a `TcpStream`.
303pub struct HttpStream(pub TcpStream);
304
305impl Clone for HttpStream {
306    #[inline]
307    fn clone(&self) -> HttpStream {
308        HttpStream(self.0.try_clone().unwrap())
309    }
310}
311
312impl fmt::Debug for HttpStream {
313    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
314        f.write_str("HttpStream(_)")
315    }
316}
317
318impl Read for HttpStream {
319    #[inline]
320    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
321        self.0.read(buf)
322    }
323}
324
325impl Write for HttpStream {
326    #[inline]
327    fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
328        self.0.write(msg)
329    }
330    #[inline]
331    fn flush(&mut self) -> io::Result<()> {
332        self.0.flush()
333    }
334}
335
336#[cfg(windows)]
337impl ::std::os::windows::io::AsRawSocket for HttpStream {
338    fn as_raw_socket(&self) -> std::os::windows::io::RawSocket {
339        self.0.as_raw_socket()
340    }
341}
342
343#[cfg(windows)]
344impl ::std::os::windows::io::FromRawSocket for HttpStream {
345    unsafe fn from_raw_socket(sock: ::std::os::windows::io::RawSocket) -> HttpStream {
346        HttpStream(TcpStream::from_raw_socket(sock))
347    }
348}
349
350#[cfg(unix)]
351impl ::std::os::unix::io::AsRawFd for HttpStream {
352    fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
353        self.0.as_raw_fd()
354    }
355}
356
357#[cfg(unix)]
358impl ::std::os::unix::io::FromRawFd for HttpStream {
359    unsafe fn from_raw_fd(fd: ::std::os::unix::io::RawFd) -> HttpStream {
360        HttpStream(TcpStream::from_raw_fd(fd))
361    }
362}
363
364impl NetworkStream for HttpStream {
365    #[inline]
366    fn peer_addr(&mut self) -> io::Result<SocketAddr> {
367        self.0.peer_addr()
368    }
369
370    #[inline]
371    fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
372        self.0.set_read_timeout(dur)
373    }
374
375    #[inline]
376    fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
377        self.0.set_write_timeout(dur)
378    }
379
380    #[inline]
381    fn close(&mut self, how: Shutdown) -> io::Result<()> {
382        match self.0.shutdown(how) {
383            Ok(_) => Ok(()),
384            // see https://github.com/mco_httpium/mco_http/issues/508
385            Err(ref e) if e.kind() == ErrorKind::NotConnected => Ok(()),
386            err => err
387        }
388    }
389}
390
391/// A connector that will produce HttpStreams.
392#[derive(Debug, Clone, Default)]
393pub struct HttpConnector;
394
395impl NetworkConnector for HttpConnector {
396    type Stream = HttpStream;
397
398    fn connect(&self, host: &str, port: u16, scheme: &str) -> crate::Result<HttpStream> {
399        let addr = &(host, port);
400        Ok(match scheme {
401            "http" => {
402                debug!("http scheme");
403                Ok(HttpStream(TcpStream::connect(addr)?))
404            },
405            _ => {
406                Err(io::Error::new(io::ErrorKind::InvalidInput,
407                                "Invalid scheme for Http"))
408            }
409        }?)
410    }
411}
412
413/// A closure as a connector used to generate `TcpStream`s per request
414///
415/// # Example
416///
417/// Basic example:
418///
419/// ```norun
420/// Client::with_connector(|addr: &str, port: u16, scheme: &str| {
421///     TcpStream::connect(&(addr, port))
422/// });
423/// ```
424///
425/// Example using `TcpBuilder` from the net2 crate if you want to configure your source socket:
426///
427/// ```norun
428/// Client::with_connector(|addr: &str, port: u16, scheme: &str| {
429///     let b = TcpBuilder::new_v4());
430///     b.bind("127.0.0.1:0"));
431///     b.connect(&(addr, port))
432/// });
433/// ```
434impl<F> NetworkConnector for F where F: Fn(&str, u16, &str) -> io::Result<TcpStream> {
435    type Stream = HttpStream;
436
437    fn connect(&self, host: &str, port: u16, scheme: &str) -> crate::Result<HttpStream> {
438        Ok(HttpStream((*self)(host, port, scheme)?))
439    }
440}
441
442/// An abstraction to allow any SSL implementation to be used with client-side HttpsStreams.
443pub trait SslClient<T: NetworkStream + Send + Clone = HttpStream> {
444    /// The protected stream.
445    type Stream: NetworkStream + Send + Clone;
446    /// Wrap a client stream with SSL.
447    fn wrap_client(&self, stream: T, host: &str) -> crate::Result<Self::Stream>;
448}
449
450/// An abstraction to allow any SSL implementation to be used with server-side HttpsStreams.
451pub trait SslServer<T: NetworkStream + Send + Clone = HttpStream> {
452    /// The protected stream.
453    type Stream: NetworkStream + Send + Clone;
454    /// Wrap a server stream with SSL.
455    fn wrap_server(&self, stream: T) -> crate::Result<Self::Stream>;
456}
457
458/// A stream over the HTTP protocol, possibly protected by SSL.
459#[derive(Debug, Clone)]
460pub enum HttpsStream<S: NetworkStream> {
461    /// A plain text stream.
462    Http(HttpStream),
463    /// A stream protected by SSL.
464    Https(S)
465}
466
467impl<S: NetworkStream> Read for HttpsStream<S> {
468    #[inline]
469    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
470        match *self {
471            HttpsStream::Http(ref mut s) => s.read(buf),
472            HttpsStream::Https(ref mut s) => s.read(buf)
473        }
474    }
475}
476
477impl<S: NetworkStream> Write for HttpsStream<S> {
478    #[inline]
479    fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
480        match *self {
481            HttpsStream::Http(ref mut s) => s.write(msg),
482            HttpsStream::Https(ref mut s) => s.write(msg)
483        }
484    }
485
486    #[inline]
487    fn flush(&mut self) -> io::Result<()> {
488        match *self {
489            HttpsStream::Http(ref mut s) => s.flush(),
490            HttpsStream::Https(ref mut s) => s.flush()
491        }
492    }
493}
494
495impl<S: NetworkStream> NetworkStream for HttpsStream<S> {
496    #[inline]
497    fn peer_addr(&mut self) -> io::Result<SocketAddr> {
498        match *self {
499            HttpsStream::Http(ref mut s) => s.peer_addr(),
500            HttpsStream::Https(ref mut s) => s.peer_addr()
501        }
502    }
503
504    #[inline]
505    fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
506        match *self {
507            HttpsStream::Http(ref inner) => inner.0.set_read_timeout(dur),
508            HttpsStream::Https(ref inner) => inner.set_read_timeout(dur)
509        }
510    }
511
512    #[inline]
513    fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
514        match *self {
515            HttpsStream::Http(ref inner) => inner.0.set_write_timeout(dur),
516            HttpsStream::Https(ref inner) => inner.set_write_timeout(dur)
517        }
518    }
519
520    #[inline]
521    fn close(&mut self, how: Shutdown) -> io::Result<()> {
522        match *self {
523            HttpsStream::Http(ref mut s) => s.close(how),
524            HttpsStream::Https(ref mut s) => s.close(how)
525        }
526    }
527}
528
529/// A Http Listener over SSL.
530#[derive(Clone)]
531pub struct HttpsListener<S: SslServer> {
532    listener: HttpListener,
533    ssl: S,
534}
535
536impl<S: SslServer> HttpsListener<S> {
537    /// Start listening to an address over HTTPS.
538    pub fn new<To: ToSocketAddrs>(addr: To, ssl: S) -> crate::Result<HttpsListener<S>> {
539        HttpListener::new(addr).map(|l| HttpsListener {
540            listener: l,
541            ssl: ssl
542        })
543    }
544
545    /// Construct an HttpsListener from a bound `TcpListener`.
546    pub fn with_listener(listener: HttpListener, ssl: S) -> HttpsListener<S> {
547        HttpsListener {
548            listener: listener,
549            ssl: ssl
550        }
551    }
552}
553
554impl<S: SslServer + Clone> NetworkListener for HttpsListener<S> {
555    type Stream = S::Stream;
556
557    #[inline]
558    fn accept(&mut self) -> crate::Result<S::Stream> {
559        self.listener.accept().and_then(|s| self.ssl.wrap_server(s))
560    }
561
562    #[inline]
563    fn local_addr(&mut self) -> io::Result<SocketAddr> {
564        self.listener.local_addr()
565    }
566
567    fn set_read_timeout(&mut self, duration: Option<Duration>) {
568        self.listener.set_read_timeout(duration)
569    }
570
571    fn set_write_timeout(&mut self, duration: Option<Duration>) {
572        self.listener.set_write_timeout(duration)
573    }
574}
575
576/// A connector that can protect HTTP streams using SSL.
577#[derive(Debug, Default)]
578pub struct HttpsConnector<S: SslClient, C: NetworkConnector = HttpConnector> {
579    ssl: S,
580    connector: C,
581}
582
583impl<S: SslClient> HttpsConnector<S, HttpConnector> {
584    /// Create a new connector using the provided SSL implementation.
585    pub fn new(s: S) -> HttpsConnector<S, HttpConnector> {
586        HttpsConnector::with_connector(s, HttpConnector)
587    }
588}
589
590impl<S: SslClient, C: NetworkConnector> HttpsConnector<S, C> {
591    /// Create a new connector using the provided SSL implementation.
592    pub fn with_connector(s: S, connector: C) -> HttpsConnector<S, C> {
593        HttpsConnector { ssl: s, connector: connector }
594    }
595}
596
597impl<S: SslClient, C: NetworkConnector<Stream=HttpStream>> NetworkConnector for HttpsConnector<S, C> {
598    type Stream = HttpsStream<S::Stream>;
599
600    fn connect(&self, host: &str, port: u16, scheme: &str) -> crate::Result<Self::Stream> {
601        let stream = self.connector.connect(host, port, "http")?;
602        if scheme == "https" {
603            debug!("https scheme");
604            self.ssl.wrap_client(stream, host).map(HttpsStream::Https)
605        } else {
606            Ok(HttpsStream::Http(stream))
607        }
608    }
609}
610
611
612#[doc(hidden)]
613pub type DefaultConnector = HttpConnector;
614
615#[cfg(test)]
616mod tests {
617    use crate::mock::MockStream;
618    use super::{NetworkStream};
619
620    #[test]
621    fn test_downcast_box_stream() {
622        // FIXME: Use Type ascription
623        let stream: Box<dyn NetworkStream + Send> = Box::new(MockStream::new());
624
625        let mock = stream.downcast::<MockStream>().ok().unwrap();
626        assert_eq!(mock, Box::new(MockStream::new()));
627    }
628
629    #[test]
630    fn test_downcast_unchecked_box_stream() {
631        // FIXME: Use Type ascription
632        let stream: Box<dyn NetworkStream + Send> = Box::new(MockStream::new());
633
634        let mock = unsafe { stream.downcast_unchecked::<MockStream>() };
635        assert_eq!(mock, Box::new(MockStream::new()));
636    }
637}
638