1use std::any::{Any, TypeId};
3use std::fmt;
4use std::io::{self, ErrorKind, Read, Write};
5use std::net::{SocketAddr, ToSocketAddrs, TcpStream, TcpListener, Shutdown};
6use std::mem;
7use std::sync::Arc;
8
9use std::time::Duration;
10
11use typeable::Typeable;
12use traitobject;
13
14pub enum Fresh {}
16
17pub enum Streaming {}
19
20pub trait NetworkListener: Clone {
22 type Stream: NetworkStream + Send + Clone;
24
25 fn accept(&mut self) -> crate::Result<Self::Stream>;
27
28 fn local_addr(&mut self) -> io::Result<SocketAddr>;
30
31 fn incoming(&mut self) -> NetworkConnections<Self> {
33 NetworkConnections(self)
34 }
35
36 fn set_read_timeout(&mut self, _: Option<Duration>) {
38 warn!("Ignoring read timeout");
42 }
43
44 fn set_write_timeout(&mut self, _: Option<Duration>) {
46 warn!("Ignoring write timeout");
50 }
51}
52
53pub struct NetworkConnections<'a, N: NetworkListener + 'a>(&'a mut N);
55
56impl<'a, N: NetworkListener + 'a> Iterator for NetworkConnections<'a, N> {
57 type Item = crate::Result<N::Stream>;
58 fn next(&mut self) -> Option<crate::Result<N::Stream>> {
59 Some(self.0.accept())
60 }
61}
62
63pub trait NetworkStream: Read + Write + Any + Send + Typeable {
65 fn peer_addr(&mut self) -> io::Result<SocketAddr>;
67
68 fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()>;
70
71 fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()>;
73
74 #[inline]
76 fn close(&mut self, _how: Shutdown) -> io::Result<()> {
77 Ok(())
78 }
79
80 #[doc(hidden)]
83 fn set_previous_response_expected_no_content(&mut self, _expected: bool) { }
84
85 #[doc(hidden)]
86 fn previous_response_expected_no_content(&self) -> bool {
87 false
88 }
89}
90
91pub trait NetworkConnector {
93 type Stream: Into<Box<dyn NetworkStream + Send>>;
95
96 fn connect(&self, host: &str, port: u16, scheme: &str) -> crate::Result<Self::Stream>;
98}
99
100impl<T: NetworkStream + Send> From<T> for Box<dyn NetworkStream + Send> {
101 fn from(s: T) -> Box<dyn NetworkStream + Send> {
102 Box::new(s)
103 }
104}
105
106impl fmt::Debug for Box<dyn NetworkStream + Send> {
107 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
108 fmt.pad("Box<NetworkStream>")
109 }
110}
111
112impl dyn NetworkStream {
113 unsafe fn downcast_ref_unchecked<T: 'static>(&self) -> &T {
114 mem::transmute(traitobject::data(self))
115 }
116
117 unsafe fn downcast_mut_unchecked<T: 'static>(&mut self) -> &mut T {
118 mem::transmute(traitobject::data_mut(self))
119 }
120
121 unsafe fn downcast_unchecked<T: 'static>(self: Box<dyn NetworkStream>) -> Box<T> {
122 let raw: *mut dyn NetworkStream = mem::transmute(self);
123 mem::transmute(traitobject::data_mut(raw))
124 }
125}
126
127impl dyn NetworkStream {
128 #[inline]
130 pub fn is<T: Any>(&self) -> bool {
131 (*self).get_type() == TypeId::of::<T>()
132 }
133
134 #[inline]
136 pub fn downcast_ref<T: Any>(&self) -> Option<&T> {
137 if self.is::<T>() {
138 Some(unsafe { self.downcast_ref_unchecked() })
139 } else {
140 None
141 }
142 }
143
144 #[inline]
147 pub fn downcast_mut<T: Any>(&mut self) -> Option<&mut T> {
148 if self.is::<T>() {
149 Some(unsafe { self.downcast_mut_unchecked() })
150 } else {
151 None
152 }
153 }
154
155 #[inline]
157 pub fn downcast<T: Any>(self: Box<dyn NetworkStream>)
158 -> Result<Box<T>, Box<dyn NetworkStream>> {
159 if self.is::<T>() {
160 Ok(unsafe { self.downcast_unchecked() })
161 } else {
162 Err(self)
163 }
164 }
165}
166
167impl dyn NetworkStream + Send {
168 unsafe fn downcast_ref_unchecked<T: 'static>(&self) -> &T {
169 mem::transmute(traitobject::data(self))
170 }
171
172 unsafe fn downcast_mut_unchecked<T: 'static>(&mut self) -> &mut T {
173 mem::transmute(traitobject::data_mut(self))
174 }
175
176 unsafe fn downcast_unchecked<T: 'static>(self: Box<dyn NetworkStream + Send>) -> Box<T> {
177 let raw: *mut dyn NetworkStream = mem::transmute(self);
178 mem::transmute(traitobject::data_mut(raw))
179 }
180}
181
182impl dyn NetworkStream + Send {
183 #[inline]
185 pub fn is<T: Any>(&self) -> bool {
186 (*self).get_type() == TypeId::of::<T>()
187 }
188
189 #[inline]
191 pub fn downcast_ref<T: Any>(&self) -> Option<&T> {
192 if self.is::<T>() {
193 Some(unsafe { self.downcast_ref_unchecked() })
194 } else {
195 None
196 }
197 }
198
199 #[inline]
202 pub fn downcast_mut<T: Any>(&mut self) -> Option<&mut T> {
203 if self.is::<T>() {
204 Some(unsafe { self.downcast_mut_unchecked() })
205 } else {
206 None
207 }
208 }
209
210 #[inline]
212 pub fn downcast<T: Any>(self: Box<dyn NetworkStream + Send>)
213 -> Result<Box<T>, Box<dyn NetworkStream + Send>> {
214 if self.is::<T>() {
215 Ok(unsafe { self.downcast_unchecked() })
216 } else {
217 Err(self)
218 }
219 }
220}
221
222#[derive(Clone)]
224pub struct HttpListener {
225 listener: Arc<TcpListener>,
226
227 read_timeout : Option<Duration>,
228 write_timeout: Option<Duration>,
229}
230
231impl From<TcpListener> for HttpListener {
232 fn from(listener: TcpListener) -> HttpListener {
233 HttpListener {
234 listener: Arc::new(listener),
235
236 read_timeout : None,
237 write_timeout: None,
238 }
239 }
240}
241
242impl HttpListener {
243 pub fn new<To: ToSocketAddrs>(addr: To) -> crate::Result<HttpListener> {
245 Ok(HttpListener::from(TcpListener::bind(addr)?))
246 }
247}
248
249impl NetworkListener for HttpListener {
250 type Stream = HttpStream;
251
252 #[inline]
253 fn accept(&mut self) -> crate::Result<HttpStream> {
254 let stream = HttpStream(self.listener.accept()?.0);
255 stream.set_read_timeout(self.read_timeout)?;
256 stream.set_write_timeout(self.write_timeout)?;
257 Ok(stream)
258 }
259
260 #[inline]
261 fn local_addr(&mut self) -> io::Result<SocketAddr> {
262 self.listener.local_addr()
263 }
264
265 fn set_read_timeout(&mut self, duration: Option<Duration>) {
266 self.read_timeout = duration;
267 }
268
269 fn set_write_timeout(&mut self, duration: Option<Duration>) {
270 self.write_timeout = duration;
271 }
272}
273
274#[cfg(windows)]
275impl ::std::os::windows::io::AsRawSocket for HttpListener {
276 fn as_raw_socket(&self) -> std::os::windows::io::RawSocket {
277 self.listener.as_raw_socket()
278 }
279}
280
281#[cfg(windows)]
282impl ::std::os::windows::io::FromRawSocket for HttpListener {
283 unsafe fn from_raw_socket(sock: ::std::os::windows::io::RawSocket) -> HttpListener {
284 HttpListener::from(TcpListener::from_raw_socket(sock))
285 }
286}
287
288#[cfg(unix)]
289impl ::std::os::unix::io::AsRawFd for HttpListener {
290 fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
291 self.listener.as_raw_fd()
292 }
293}
294
295#[cfg(unix)]
296impl ::std::os::unix::io::FromRawFd for HttpListener {
297 unsafe fn from_raw_fd(fd: ::std::os::unix::io::RawFd) -> HttpListener {
298 HttpListener::from(TcpListener::from_raw_fd(fd))
299 }
300}
301
302pub struct HttpStream(pub TcpStream);
304
305impl Clone for HttpStream {
306 #[inline]
307 fn clone(&self) -> HttpStream {
308 HttpStream(self.0.try_clone().unwrap())
309 }
310}
311
312impl fmt::Debug for HttpStream {
313 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
314 f.write_str("HttpStream(_)")
315 }
316}
317
318impl Read for HttpStream {
319 #[inline]
320 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
321 self.0.read(buf)
322 }
323}
324
325impl Write for HttpStream {
326 #[inline]
327 fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
328 self.0.write(msg)
329 }
330 #[inline]
331 fn flush(&mut self) -> io::Result<()> {
332 self.0.flush()
333 }
334}
335
336#[cfg(windows)]
337impl ::std::os::windows::io::AsRawSocket for HttpStream {
338 fn as_raw_socket(&self) -> std::os::windows::io::RawSocket {
339 self.0.as_raw_socket()
340 }
341}
342
343#[cfg(windows)]
344impl ::std::os::windows::io::FromRawSocket for HttpStream {
345 unsafe fn from_raw_socket(sock: ::std::os::windows::io::RawSocket) -> HttpStream {
346 HttpStream(TcpStream::from_raw_socket(sock))
347 }
348}
349
350#[cfg(unix)]
351impl ::std::os::unix::io::AsRawFd for HttpStream {
352 fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
353 self.0.as_raw_fd()
354 }
355}
356
357#[cfg(unix)]
358impl ::std::os::unix::io::FromRawFd for HttpStream {
359 unsafe fn from_raw_fd(fd: ::std::os::unix::io::RawFd) -> HttpStream {
360 HttpStream(TcpStream::from_raw_fd(fd))
361 }
362}
363
364impl NetworkStream for HttpStream {
365 #[inline]
366 fn peer_addr(&mut self) -> io::Result<SocketAddr> {
367 self.0.peer_addr()
368 }
369
370 #[inline]
371 fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
372 self.0.set_read_timeout(dur)
373 }
374
375 #[inline]
376 fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
377 self.0.set_write_timeout(dur)
378 }
379
380 #[inline]
381 fn close(&mut self, how: Shutdown) -> io::Result<()> {
382 match self.0.shutdown(how) {
383 Ok(_) => Ok(()),
384 Err(ref e) if e.kind() == ErrorKind::NotConnected => Ok(()),
386 err => err
387 }
388 }
389}
390
391#[derive(Debug, Clone, Default)]
393pub struct HttpConnector;
394
395impl NetworkConnector for HttpConnector {
396 type Stream = HttpStream;
397
398 fn connect(&self, host: &str, port: u16, scheme: &str) -> crate::Result<HttpStream> {
399 let addr = &(host, port);
400 Ok(match scheme {
401 "http" => {
402 debug!("http scheme");
403 Ok(HttpStream(TcpStream::connect(addr)?))
404 },
405 _ => {
406 Err(io::Error::new(io::ErrorKind::InvalidInput,
407 "Invalid scheme for Http"))
408 }
409 }?)
410 }
411}
412
413impl<F> NetworkConnector for F where F: Fn(&str, u16, &str) -> io::Result<TcpStream> {
435 type Stream = HttpStream;
436
437 fn connect(&self, host: &str, port: u16, scheme: &str) -> crate::Result<HttpStream> {
438 Ok(HttpStream((*self)(host, port, scheme)?))
439 }
440}
441
442pub trait SslClient<T: NetworkStream + Send + Clone = HttpStream> {
444 type Stream: NetworkStream + Send + Clone;
446 fn wrap_client(&self, stream: T, host: &str) -> crate::Result<Self::Stream>;
448}
449
450pub trait SslServer<T: NetworkStream + Send + Clone = HttpStream> {
452 type Stream: NetworkStream + Send + Clone;
454 fn wrap_server(&self, stream: T) -> crate::Result<Self::Stream>;
456}
457
458#[derive(Debug, Clone)]
460pub enum HttpsStream<S: NetworkStream> {
461 Http(HttpStream),
463 Https(S)
465}
466
467impl<S: NetworkStream> Read for HttpsStream<S> {
468 #[inline]
469 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
470 match *self {
471 HttpsStream::Http(ref mut s) => s.read(buf),
472 HttpsStream::Https(ref mut s) => s.read(buf)
473 }
474 }
475}
476
477impl<S: NetworkStream> Write for HttpsStream<S> {
478 #[inline]
479 fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
480 match *self {
481 HttpsStream::Http(ref mut s) => s.write(msg),
482 HttpsStream::Https(ref mut s) => s.write(msg)
483 }
484 }
485
486 #[inline]
487 fn flush(&mut self) -> io::Result<()> {
488 match *self {
489 HttpsStream::Http(ref mut s) => s.flush(),
490 HttpsStream::Https(ref mut s) => s.flush()
491 }
492 }
493}
494
495impl<S: NetworkStream> NetworkStream for HttpsStream<S> {
496 #[inline]
497 fn peer_addr(&mut self) -> io::Result<SocketAddr> {
498 match *self {
499 HttpsStream::Http(ref mut s) => s.peer_addr(),
500 HttpsStream::Https(ref mut s) => s.peer_addr()
501 }
502 }
503
504 #[inline]
505 fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
506 match *self {
507 HttpsStream::Http(ref inner) => inner.0.set_read_timeout(dur),
508 HttpsStream::Https(ref inner) => inner.set_read_timeout(dur)
509 }
510 }
511
512 #[inline]
513 fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
514 match *self {
515 HttpsStream::Http(ref inner) => inner.0.set_write_timeout(dur),
516 HttpsStream::Https(ref inner) => inner.set_write_timeout(dur)
517 }
518 }
519
520 #[inline]
521 fn close(&mut self, how: Shutdown) -> io::Result<()> {
522 match *self {
523 HttpsStream::Http(ref mut s) => s.close(how),
524 HttpsStream::Https(ref mut s) => s.close(how)
525 }
526 }
527}
528
529#[derive(Clone)]
531pub struct HttpsListener<S: SslServer> {
532 listener: HttpListener,
533 ssl: S,
534}
535
536impl<S: SslServer> HttpsListener<S> {
537 pub fn new<To: ToSocketAddrs>(addr: To, ssl: S) -> crate::Result<HttpsListener<S>> {
539 HttpListener::new(addr).map(|l| HttpsListener {
540 listener: l,
541 ssl: ssl
542 })
543 }
544
545 pub fn with_listener(listener: HttpListener, ssl: S) -> HttpsListener<S> {
547 HttpsListener {
548 listener: listener,
549 ssl: ssl
550 }
551 }
552}
553
554impl<S: SslServer + Clone> NetworkListener for HttpsListener<S> {
555 type Stream = S::Stream;
556
557 #[inline]
558 fn accept(&mut self) -> crate::Result<S::Stream> {
559 self.listener.accept().and_then(|s| self.ssl.wrap_server(s))
560 }
561
562 #[inline]
563 fn local_addr(&mut self) -> io::Result<SocketAddr> {
564 self.listener.local_addr()
565 }
566
567 fn set_read_timeout(&mut self, duration: Option<Duration>) {
568 self.listener.set_read_timeout(duration)
569 }
570
571 fn set_write_timeout(&mut self, duration: Option<Duration>) {
572 self.listener.set_write_timeout(duration)
573 }
574}
575
576#[derive(Debug, Default)]
578pub struct HttpsConnector<S: SslClient, C: NetworkConnector = HttpConnector> {
579 ssl: S,
580 connector: C,
581}
582
583impl<S: SslClient> HttpsConnector<S, HttpConnector> {
584 pub fn new(s: S) -> HttpsConnector<S, HttpConnector> {
586 HttpsConnector::with_connector(s, HttpConnector)
587 }
588}
589
590impl<S: SslClient, C: NetworkConnector> HttpsConnector<S, C> {
591 pub fn with_connector(s: S, connector: C) -> HttpsConnector<S, C> {
593 HttpsConnector { ssl: s, connector: connector }
594 }
595}
596
597impl<S: SslClient, C: NetworkConnector<Stream=HttpStream>> NetworkConnector for HttpsConnector<S, C> {
598 type Stream = HttpsStream<S::Stream>;
599
600 fn connect(&self, host: &str, port: u16, scheme: &str) -> crate::Result<Self::Stream> {
601 let stream = self.connector.connect(host, port, "http")?;
602 if scheme == "https" {
603 debug!("https scheme");
604 self.ssl.wrap_client(stream, host).map(HttpsStream::Https)
605 } else {
606 Ok(HttpsStream::Http(stream))
607 }
608 }
609}
610
611
612#[doc(hidden)]
613pub type DefaultConnector = HttpConnector;
614
615#[cfg(test)]
616mod tests {
617 use crate::mock::MockStream;
618 use super::{NetworkStream};
619
620 #[test]
621 fn test_downcast_box_stream() {
622 let stream: Box<dyn NetworkStream + Send> = Box::new(MockStream::new());
624
625 let mock = stream.downcast::<MockStream>().ok().unwrap();
626 assert_eq!(mock, Box::new(MockStream::new()));
627 }
628
629 #[test]
630 fn test_downcast_unchecked_box_stream() {
631 let stream: Box<dyn NetworkStream + Send> = Box::new(MockStream::new());
633
634 let mock = unsafe { stream.downcast_unchecked::<MockStream>() };
635 assert_eq!(mock, Box::new(MockStream::new()));
636 }
637}
638