1use std::any::{Any, TypeId};
3use std::fmt;
4use std::io::{self, ErrorKind, Read, Write};
5use std::net::{SocketAddr, ToSocketAddrs, Shutdown};
6use std::mem;
7use std::sync::Arc;
8
9use std::time::Duration;
10use crate::runtime::TcpListener;
11
12use typeable::Typeable;
13use traitobject;
14use crate::runtime::TcpStream;
15
16pub enum Fresh {}
18
19pub enum Streaming {}
21
22pub trait NetworkListener: Clone {
24 type Stream: NetworkStream + Send + Clone;
26
27 fn accept(&mut self) -> crate::Result<Self::Stream>;
29
30 fn local_addr(&mut self) -> io::Result<SocketAddr>;
32
33 fn incoming(&mut self) -> NetworkConnections<Self> {
35 NetworkConnections(self)
36 }
37
38 fn set_read_timeout(&mut self, _: Option<Duration>) {
40 warn!("Ignoring read timeout");
44 }
45
46 fn set_write_timeout(&mut self, _: Option<Duration>) {
48 warn!("Ignoring write timeout");
52 }
53}
54
55pub struct NetworkConnections<'a, N: NetworkListener + 'a>(&'a mut N);
57
58impl<'a, N: NetworkListener + 'a> Iterator for NetworkConnections<'a, N> {
59 type Item = crate::Result<N::Stream>;
60 fn next(&mut self) -> Option<crate::Result<N::Stream>> {
61 Some(self.0.accept())
62 }
63}
64
65pub trait NetworkStream: Read + Write + Any + Send + Typeable {
67 fn peer_addr(&mut self) -> io::Result<SocketAddr>;
69
70 fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()>;
72
73 fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()>;
75
76 #[inline]
78 fn close(&mut self, _how: Shutdown) -> io::Result<()> {
79 Ok(())
80 }
81
82 #[doc(hidden)]
85 fn set_previous_response_expected_no_content(&mut self, _expected: bool) {}
86
87 #[doc(hidden)]
88 fn previous_response_expected_no_content(&self) -> bool {
89 false
90 }
91
92 fn set_nonblocking(&self, b: bool);
93
94 fn reset_io(&self);
95
96 fn wait_io(&self);
97}
98
99pub trait NetworkConnector {
101 type Stream: Into<Box<dyn NetworkStream + Send>>;
103
104 fn connect(&self, host: &str, port: u16, scheme: &str) -> crate::Result<Self::Stream>;
106}
107
108impl<T: NetworkStream + Send> From<T> for Box<dyn NetworkStream + Send> {
109 fn from(s: T) -> Box<dyn NetworkStream + Send> {
110 Box::new(s)
111 }
112}
113
114impl fmt::Debug for Box<dyn NetworkStream + Send> {
115 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
116 fmt.pad("Box<NetworkStream>")
117 }
118}
119
120impl dyn NetworkStream {
121 unsafe fn downcast_ref_unchecked<T: 'static>(&self) -> &T {
122 mem::transmute(traitobject::data(self))
123 }
124
125 unsafe fn downcast_mut_unchecked<T: 'static>(&mut self) -> &mut T {
126 mem::transmute(traitobject::data_mut(self))
127 }
128
129 unsafe fn downcast_unchecked<T: 'static>(self: Box<dyn NetworkStream>) -> Box<T> {
130 let raw: *mut dyn NetworkStream = mem::transmute(self);
131 mem::transmute(traitobject::data_mut(raw))
132 }
133}
134
135impl dyn NetworkStream {
136 #[inline]
138 pub fn is<T: Any>(&self) -> bool {
139 (*self).get_type() == TypeId::of::<T>()
140 }
141
142 #[inline]
144 pub fn downcast_ref<T: Any>(&self) -> Option<&T> {
145 if self.is::<T>() {
146 Some(unsafe { self.downcast_ref_unchecked() })
147 } else {
148 None
149 }
150 }
151
152 #[inline]
155 pub fn downcast_mut<T: Any>(&mut self) -> Option<&mut T> {
156 if self.is::<T>() {
157 Some(unsafe { self.downcast_mut_unchecked() })
158 } else {
159 None
160 }
161 }
162
163 #[inline]
165 pub fn downcast<T: Any>(self: Box<dyn NetworkStream>)
166 -> Result<Box<T>, Box<dyn NetworkStream>> {
167 if self.is::<T>() {
168 Ok(unsafe { self.downcast_unchecked() })
169 } else {
170 Err(self)
171 }
172 }
173}
174
175impl dyn NetworkStream + Send {
176 unsafe fn downcast_ref_unchecked<T: 'static>(&self) -> &T {
177 mem::transmute(traitobject::data(self))
178 }
179
180 unsafe fn downcast_mut_unchecked<T: 'static>(&mut self) -> &mut T {
181 mem::transmute(traitobject::data_mut(self))
182 }
183
184 unsafe fn downcast_unchecked<T: 'static>(self: Box<dyn NetworkStream + Send>) -> Box<T> {
185 let raw: *mut dyn NetworkStream = mem::transmute(self);
186 mem::transmute(traitobject::data_mut(raw))
187 }
188}
189
190impl dyn NetworkStream + Send {
191 #[inline]
193 pub fn is<T: Any>(&self) -> bool {
194 (*self).get_type() == TypeId::of::<T>()
195 }
196
197 #[inline]
199 pub fn downcast_ref<T: Any>(&self) -> Option<&T> {
200 if self.is::<T>() {
201 Some(unsafe { self.downcast_ref_unchecked() })
202 } else {
203 None
204 }
205 }
206
207 #[inline]
210 pub fn downcast_mut<T: Any>(&mut self) -> Option<&mut T> {
211 if self.is::<T>() {
212 Some(unsafe { self.downcast_mut_unchecked() })
213 } else {
214 None
215 }
216 }
217
218 #[inline]
220 pub fn downcast<T: Any>(self: Box<dyn NetworkStream + Send>)
221 -> Result<Box<T>, Box<dyn NetworkStream + Send>> {
222 if self.is::<T>() {
223 Ok(unsafe { self.downcast_unchecked() })
224 } else {
225 Err(self)
226 }
227 }
228}
229
230#[derive(Clone)]
232pub struct HttpListener {
233 listener: Arc<TcpListener>,
234
235 read_timeout: Option<Duration>,
236 write_timeout: Option<Duration>,
237}
238
239impl From<TcpListener> for HttpListener {
240 fn from(listener: TcpListener) -> HttpListener {
241 HttpListener {
242 listener: Arc::new(listener),
243
244 read_timeout: None,
245 write_timeout: None,
246 }
247 }
248}
249
250impl HttpListener {
251 pub fn new<To: ToSocketAddrs>(addr: To) -> crate::Result<HttpListener> {
253 Ok(HttpListener::from(r#try!(TcpListener::bind(addr))))
254 }
255}
256
257impl NetworkListener for HttpListener {
258 type Stream = HttpStream;
259
260 #[inline]
261 fn accept(&mut self) -> crate::Result<HttpStream> {
262 let stream = HttpStream(r#try!(self.listener.accept()).0);
263 r#try!(stream.set_read_timeout(self.read_timeout));
264 r#try!(stream.set_write_timeout(self.write_timeout));
265 Ok(stream)
266 }
267
268 #[inline]
269 fn local_addr(&mut self) -> io::Result<SocketAddr> {
270 self.listener.local_addr()
271 }
272
273 fn set_read_timeout(&mut self, duration: Option<Duration>) {
274 self.read_timeout = duration;
275 }
276
277 fn set_write_timeout(&mut self, duration: Option<Duration>) {
278 self.write_timeout = duration;
279 }
280}
281
282#[cfg(windows)]
283impl ::std::os::windows::io::AsRawSocket for HttpListener {
284 fn as_raw_socket(&self) -> ::std::os::windows::io::RawSocket {
285 self.listener.as_raw_socket()
286 }
287}
288
289#[cfg(windows)]
290impl ::std::os::windows::io::FromRawSocket for HttpListener {
291 unsafe fn from_raw_socket(sock: ::std::os::windows::io::RawSocket) -> HttpListener {
292 HttpListener::from(TcpListener::from_raw_socket(sock))
293 }
294}
295
296#[cfg(unix)]
297impl ::std::os::unix::io::AsRawFd for HttpListener {
298 fn as_raw_fd(&self) -> ::std::os::unix::io::RawFd {
299 self.listener.as_raw_fd()
300 }
301}
302
303#[cfg(unix)]
304impl ::std::os::unix::io::FromRawFd for HttpListener {
305 unsafe fn from_raw_fd(fd: ::std::os::unix::io::RawFd) -> HttpListener {
306 HttpListener::from(TcpListener::from_raw_fd(fd))
307 }
308}
309
310pub struct HttpStream(pub TcpStream);
312
313impl Clone for HttpStream {
314 #[inline]
315 fn clone(&self) -> HttpStream {
316 HttpStream(self.0.try_clone().unwrap())
317 }
318}
319
320impl fmt::Debug for HttpStream {
321 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
322 f.write_str("HttpStream(_)")
323 }
324}
325
326impl Read for HttpStream {
327 #[inline]
328 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
329 self.0.read(buf)
330 }
331}
332
333impl Write for HttpStream {
334 #[inline]
335 fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
336 self.0.write(msg)
337 }
338 #[inline]
339 fn flush(&mut self) -> io::Result<()> {
340 self.0.flush()
341 }
342}
343
344#[cfg(windows)]
345impl ::std::os::windows::io::AsRawSocket for HttpStream {
346 fn as_raw_socket(&self) -> ::std::os::windows::io::RawSocket {
347 self.0.as_raw_socket()
348 }
349}
350
351#[cfg(windows)]
352impl ::std::os::windows::io::FromRawSocket for HttpStream {
353 unsafe fn from_raw_socket(sock: ::std::os::windows::io::RawSocket) -> HttpStream {
354 HttpStream(TcpStream::from_raw_socket(sock))
355 }
356}
357
358#[cfg(unix)]
359impl ::std::os::unix::io::AsRawFd for HttpStream {
360 fn as_raw_fd(&self) -> ::std::os::unix::io::RawFd {
361 self.0.as_raw_fd()
362 }
363}
364
365#[cfg(unix)]
366impl ::std::os::unix::io::FromRawFd for HttpStream {
367 unsafe fn from_raw_fd(fd: ::std::os::unix::io::RawFd) -> HttpStream {
368 HttpStream(TcpStream::from_raw_fd(fd))
369 }
370}
371
372impl NetworkStream for HttpStream {
373 #[inline]
374 fn peer_addr(&mut self) -> io::Result<SocketAddr> {
375 self.0.peer_addr()
376 }
377
378 #[inline]
379 fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
380 self.0.set_read_timeout(dur)
381 }
382
383 #[inline]
384 fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
385 self.0.set_write_timeout(dur)
386 }
387
388 #[inline]
389 fn close(&mut self, how: Shutdown) -> io::Result<()> {
390 match self.0.shutdown(how) {
391 Ok(_) => Ok(()),
392 Err(ref e) if e.kind() == ErrorKind::NotConnected => Ok(()),
394 err => err
395 }
396 }
397
398 fn set_nonblocking(&self, b: bool) {
399 #[cfg(unix)]
400 self.0.set_nonblocking(b);
401 }
402
403 fn reset_io(&self) {
404 #[cfg(unix)]
405 use cogo::io::WaitIo;
406 #[cfg(unix)]
407 self.0.reset_io();
408 }
409
410 fn wait_io(&self) {
411 #[cfg(unix)]
412 use cogo::io::WaitIo;
413 #[cfg(unix)]
414 self.0.wait_io();
415 }
416}
417
418#[derive(Debug, Clone, Default)]
420pub struct HttpConnector;
421
422impl NetworkConnector for HttpConnector {
423 type Stream = HttpStream;
424
425 fn connect(&self, host: &str, port: u16, scheme: &str) -> crate::Result<HttpStream> {
426 let addr = &(host, port);
427 Ok(r#try!(match scheme {
428 "http" => {
429 debug!("http scheme");
430 Ok(HttpStream(r#try!(TcpStream::connect(addr))))
431 },
432 _ => {
433 Err(io::Error::new(io::ErrorKind::InvalidInput,
434 "Invalid scheme for Http"))
435 }
436 }))
437 }
438}
439
440impl<F> NetworkConnector for F where F: Fn(&str, u16, &str) -> io::Result<TcpStream> {
462 type Stream = HttpStream;
463
464 fn connect(&self, host: &str, port: u16, scheme: &str) -> crate::Result<HttpStream> {
465 Ok(HttpStream(r#try!((*self)(host, port, scheme))))
466 }
467}
468
469pub trait SslClient<T: NetworkStream + Send + Clone = HttpStream> {
471 type Stream: NetworkStream + Send + Clone;
473 fn wrap_client(&self, stream: T, host: &str) -> crate::Result<Self::Stream>;
475}
476
477pub trait SslServer<T: NetworkStream + Send + Clone = HttpStream> {
479 type Stream: NetworkStream + Send + Clone;
481 fn wrap_server(&self, stream: T) -> crate::Result<Self::Stream>;
483}
484
485#[derive(Debug, Clone)]
487pub enum HttpsStream<S: NetworkStream> {
488 Http(HttpStream),
490 Https(S),
492}
493
494impl<S: NetworkStream> Read for HttpsStream<S> {
495 #[inline]
496 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
497 match *self {
498 HttpsStream::Http(ref mut s) => s.read(buf),
499 HttpsStream::Https(ref mut s) => s.read(buf)
500 }
501 }
502}
503
504impl<S: NetworkStream> Write for HttpsStream<S> {
505 #[inline]
506 fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
507 match *self {
508 HttpsStream::Http(ref mut s) => s.write(msg),
509 HttpsStream::Https(ref mut s) => s.write(msg)
510 }
511 }
512
513 #[inline]
514 fn flush(&mut self) -> io::Result<()> {
515 match *self {
516 HttpsStream::Http(ref mut s) => s.flush(),
517 HttpsStream::Https(ref mut s) => s.flush()
518 }
519 }
520}
521
522impl<S: NetworkStream> NetworkStream for HttpsStream<S> {
523 #[inline]
524 fn peer_addr(&mut self) -> io::Result<SocketAddr> {
525 match *self {
526 HttpsStream::Http(ref mut s) => s.peer_addr(),
527 HttpsStream::Https(ref mut s) => s.peer_addr()
528 }
529 }
530
531 #[inline]
532 fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
533 match *self {
534 HttpsStream::Http(ref inner) => inner.0.set_read_timeout(dur),
535 HttpsStream::Https(ref inner) => inner.set_read_timeout(dur)
536 }
537 }
538
539 #[inline]
540 fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
541 match *self {
542 HttpsStream::Http(ref inner) => inner.0.set_write_timeout(dur),
543 HttpsStream::Https(ref inner) => inner.set_write_timeout(dur)
544 }
545 }
546
547 #[inline]
548 fn close(&mut self, how: Shutdown) -> io::Result<()> {
549 match *self {
550 HttpsStream::Http(ref mut s) => s.close(how),
551 HttpsStream::Https(ref mut s) => s.close(how)
552 }
553 }
554 #[inline]
555 fn set_nonblocking(&self, b: bool) {
556 match *self {
557 HttpsStream::Http(ref s) => s.set_nonblocking(b),
558 HttpsStream::Https(ref s) => s.set_nonblocking(b)
559 }
560 }
561 #[inline]
562 fn reset_io(&self) {
563 match *self {
564 HttpsStream::Http(ref s) => s.reset_io(),
565 HttpsStream::Https(ref s) => s.reset_io()
566 }
567 }
568 #[inline]
569 fn wait_io(&self) {
570 match *self {
571 HttpsStream::Http(ref s) => s.wait_io(),
572 HttpsStream::Https(ref s) => s.wait_io()
573 }
574 }
575}
576
577#[derive(Clone)]
579pub struct HttpsListener<S: SslServer> {
580 listener: HttpListener,
581 ssl: S,
582}
583
584impl<S: SslServer> HttpsListener<S> {
585 pub fn new<To: ToSocketAddrs>(addr: To, ssl: S) -> crate::Result<HttpsListener<S>> {
587 HttpListener::new(addr).map(|l| HttpsListener {
588 listener: l,
589 ssl: ssl,
590 })
591 }
592
593 pub fn with_listener(listener: HttpListener, ssl: S) -> HttpsListener<S> {
595 HttpsListener {
596 listener: listener,
597 ssl: ssl,
598 }
599 }
600}
601
602impl<S: SslServer + Clone> NetworkListener for HttpsListener<S> {
603 type Stream = S::Stream;
604
605 #[inline]
606 fn accept(&mut self) -> crate::Result<S::Stream> {
607 self.listener.accept().and_then(|s| self.ssl.wrap_server(s))
608 }
609
610 #[inline]
611 fn local_addr(&mut self) -> io::Result<SocketAddr> {
612 self.listener.local_addr()
613 }
614
615 fn set_read_timeout(&mut self, duration: Option<Duration>) {
616 self.listener.set_read_timeout(duration)
617 }
618
619 fn set_write_timeout(&mut self, duration: Option<Duration>) {
620 self.listener.set_write_timeout(duration)
621 }
622}
623
624#[derive(Debug, Default)]
626pub struct HttpsConnector<S: SslClient, C: NetworkConnector = HttpConnector> {
627 ssl: S,
628 connector: C,
629}
630
631impl<S: SslClient> HttpsConnector<S, HttpConnector> {
632 pub fn new(s: S) -> HttpsConnector<S, HttpConnector> {
634 HttpsConnector::with_connector(s, HttpConnector)
635 }
636}
637
638impl<S: SslClient, C: NetworkConnector> HttpsConnector<S, C> {
639 pub fn with_connector(s: S, connector: C) -> HttpsConnector<S, C> {
641 HttpsConnector { ssl: s, connector: connector }
642 }
643}
644
645impl<S: SslClient, C: NetworkConnector<Stream=HttpStream>> NetworkConnector for HttpsConnector<S, C> {
646 type Stream = HttpsStream<S::Stream>;
647
648 fn connect(&self, host: &str, port: u16, scheme: &str) -> crate::Result<Self::Stream> {
649 let stream = r#try!(self.connector.connect(host, port, "http"));
650 if scheme == "https" {
651 debug!("https scheme");
652 self.ssl.wrap_client(stream, host).map(HttpsStream::Https)
653 } else {
654 Ok(HttpsStream::Http(stream))
655 }
656 }
657}
658
659
660#[doc(hidden)]
661pub type DefaultConnector = HttpConnector;
662
663#[cfg(test)]
664mod tests {
665 use crate::mock::MockStream;
666 use super::{NetworkStream};
667
668 #[test]
669 fn test_downcast_box_stream() {
670 let stream: Box<dyn NetworkStream + Send> = Box::new(MockStream::new());
672
673 let mock = stream.downcast::<MockStream>().ok().unwrap();
674 assert_eq!(mock, Box::new(MockStream::new()));
675 }
676
677 #[test]
678 fn test_downcast_unchecked_box_stream() {
679 let stream: Box<dyn NetworkStream + Send> = Box::new(MockStream::new());
681
682 let mock = unsafe { stream.downcast_unchecked::<MockStream>() };
683 assert_eq!(mock, Box::new(MockStream::new()));
684 }
685}
686