simple_hyper_client/connector/
http.rs1use crate::connector::{NetworkConnection, NetworkConnector};
8use crate::connector_impl::connect;
9use hyper::client::connect::{Connected, Connection};
10use hyper::Uri;
11use std::error::Error as StdError;
12use std::future::Future;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15use std::time::Duration;
16use std::{fmt, io};
17use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
18use tokio::net::TcpStream;
19
20pub(crate) const DEFAULT_HTTP_PORT: u16 = 80;
21pub(crate) const DEFAULT_HTTPS_PORT: u16 = 443;
22
23#[derive(Clone)]
29pub struct HttpConnector {
30 connect_timeout: Option<Duration>,
31}
32
33impl HttpConnector {
34 pub fn new() -> Self {
35 HttpConnector {
36 connect_timeout: None,
37 }
38 }
39
40 pub fn connect_timeout(mut self, timeout: Option<Duration>) -> Self {
42 self.connect_timeout = timeout;
43 self
44 }
45}
46
47impl NetworkConnector for HttpConnector {
48 fn connect(
49 &self,
50 uri: Uri,
51 ) -> Pin<
52 Box<dyn Future<Output = Result<NetworkConnection, Box<dyn StdError + Send + Sync>>> + Send>,
53 > {
54 let connect_timeout = self.connect_timeout;
55 Box::pin(async move {
56 match connect(uri, false, connect_timeout).await {
57 Ok(conn) => Ok(NetworkConnection::new(conn)),
58 Err(e) => Err(Box::new(e) as _),
59 }
60 })
61 }
62}
63
64pub struct HttpConnection {
68 pub(crate) stream: TcpStream,
69}
70
71impl Connection for HttpConnection {
72 fn connected(&self) -> Connected {
73 Connected::new()
75 }
76}
77
78impl HttpConnection {
79 pub fn into_tcp_stream(self) -> TcpStream {
80 self.stream
81 }
82}
83
84impl AsyncRead for HttpConnection {
85 fn poll_read(
86 self: Pin<&mut Self>,
87 cx: &mut Context<'_>,
88 buf: &mut ReadBuf<'_>,
89 ) -> Poll<io::Result<()>> {
90 Pin::new(&mut self.get_mut().stream).poll_read(cx, buf)
91 }
92}
93
94impl AsyncWrite for HttpConnection {
95 fn poll_write(
96 self: Pin<&mut Self>,
97 cx: &mut Context<'_>,
98 buf: &[u8],
99 ) -> Poll<io::Result<usize>> {
100 Pin::new(&mut self.get_mut().stream).poll_write(cx, buf)
101 }
102
103 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
104 Pin::new(&mut self.get_mut().stream).poll_flush(cx)
105 }
106
107 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
108 Pin::new(&mut self.get_mut().stream).poll_shutdown(cx)
109 }
110}
111
112pub struct ConnectError {
113 msg: &'static str,
114 cause: Option<Box<dyn StdError + Send + Sync>>,
115}
116
117impl ConnectError {
118 pub fn new(msg: &'static str) -> Self {
119 ConnectError { msg, cause: None }
120 }
121
122 pub fn cause<E: Into<Box<dyn StdError + Send + Sync>>>(mut self, cause: E) -> Self {
123 self.cause = Some(cause.into());
124 self
125 }
126}
127
128impl fmt::Debug for ConnectError {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 if let Some(ref cause) = self.cause {
131 f.debug_tuple("ConnectError")
132 .field(&self.msg)
133 .field(cause)
134 .finish()
135 } else {
136 self.msg.fmt(f)
137 }
138 }
139}
140
141impl fmt::Display for ConnectError {
142 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143 f.write_str(&self.msg)?;
144 if let Some(ref cause) = self.cause {
145 write!(f, ": {}", cause)?;
146 }
147 Ok(())
148 }
149}
150
151impl StdError for ConnectError {
152 fn source(&self) -> Option<&(dyn StdError + 'static)> {
153 self.cause.as_ref().map(|e| &**e as _)
154 }
155}