1use std::{
2 future::Future,
3 net,
4 pin::Pin,
5 rc::Rc,
6 task::{Context, Poll},
7};
8
9use actix_codec::Framed;
10use actix_http::{h1::ClientCodec, Payload, RequestHead, RequestHeadType, ResponseHead};
11use actix_service::Service;
12use futures_core::{future::LocalBoxFuture, ready};
13
14use crate::{
15 any_body::AnyBody,
16 client::{
17 Connect as ClientConnect, ConnectError, Connection, ConnectionIo, SendRequestError,
18 },
19 ClientResponse,
20};
21
22pub type BoxConnectorService = Rc<
23 dyn Service<
24 ConnectRequest,
25 Response = ConnectResponse,
26 Error = SendRequestError,
27 Future = LocalBoxFuture<'static, Result<ConnectResponse, SendRequestError>>,
28 >,
29>;
30
31pub type BoxedSocket = Box<dyn ConnectionIo>;
32
33pub enum ConnectRequest {
35 Client(RequestHeadType, AnyBody, Option<net::SocketAddr>),
39
40 Tunnel(RequestHead, Option<net::SocketAddr>),
44}
45
46pub enum ConnectResponse {
48 Client(ClientResponse),
50
51 Tunnel(ResponseHead, Framed<BoxedSocket, ClientCodec>),
55}
56
57impl ConnectResponse {
58 pub fn into_client_response(self) -> ClientResponse {
63 match self {
64 ConnectResponse::Client(res) => res,
65 _ => panic!(
66 "ClientResponse only reachable with ConnectResponse::ClientResponse variant"
67 ),
68 }
69 }
70
71 pub fn into_tunnel_response(self) -> (ResponseHead, Framed<BoxedSocket, ClientCodec>) {
76 match self {
77 ConnectResponse::Tunnel(head, framed) => (head, framed),
78 _ => panic!(
79 "TunnelResponse only reachable with ConnectResponse::TunnelResponse variant"
80 ),
81 }
82 }
83}
84
85pub struct DefaultConnector<S> {
86 connector: S,
87}
88
89impl<S> DefaultConnector<S> {
90 pub(crate) fn new(connector: S) -> Self {
91 Self { connector }
92 }
93}
94
95impl<S, Io> Service<ConnectRequest> for DefaultConnector<S>
96where
97 S: Service<ClientConnect, Error = ConnectError, Response = Connection<Io>>,
98 Io: ConnectionIo,
99{
100 type Response = ConnectResponse;
101 type Error = SendRequestError;
102 type Future = ConnectRequestFuture<S::Future, Io>;
103
104 actix_service::forward_ready!(connector);
105
106 fn call(&self, req: ConnectRequest) -> Self::Future {
107 let fut = match req {
109 ConnectRequest::Client(ref head, .., addr) => self.connector.call(ClientConnect {
110 uri: head.as_ref().uri.clone(),
111 addr,
112 }),
113 ConnectRequest::Tunnel(ref head, addr) => self.connector.call(ClientConnect {
114 uri: head.uri.clone(),
115 addr,
116 }),
117 };
118
119 ConnectRequestFuture::Connection {
120 fut,
121 req: Some(req),
122 }
123 }
124}
125
126pin_project_lite::pin_project! {
127 #[project = ConnectRequestProj]
128 pub enum ConnectRequestFuture<Fut, Io>
129 where
130 Io: ConnectionIo
131 {
132 Connection {
133 #[pin]
134 fut: Fut,
135 req: Option<ConnectRequest>
136 },
137 Client {
138 fut: LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>>
139 },
140 Tunnel {
141 fut: LocalBoxFuture<
142 'static,
143 Result<(ResponseHead, Framed<Connection<Io>, ClientCodec>), SendRequestError>,
144 >,
145 }
146 }
147}
148
149impl<Fut, Io> Future for ConnectRequestFuture<Fut, Io>
150where
151 Fut: Future<Output = Result<Connection<Io>, ConnectError>>,
152 Io: ConnectionIo,
153{
154 type Output = Result<ConnectResponse, SendRequestError>;
155
156 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
157 match self.as_mut().project() {
158 ConnectRequestProj::Connection { fut, req } => {
159 let connection = ready!(fut.poll(cx))?;
160 let req = req.take().unwrap();
161
162 match req {
163 ConnectRequest::Client(head, body, ..) => {
164 let fut = ConnectRequestFuture::Client {
166 fut: connection.send_request(head, body),
167 };
168
169 self.set(fut);
170 }
171
172 ConnectRequest::Tunnel(head, ..) => {
173 let fut = ConnectRequestFuture::Tunnel {
175 fut: connection.open_tunnel(RequestHeadType::from(head)),
176 };
177
178 self.set(fut);
179 }
180 }
181
182 self.poll(cx)
183 }
184
185 ConnectRequestProj::Client { fut } => {
186 let (head, payload) = ready!(fut.as_mut().poll(cx))?;
187 Poll::Ready(Ok(ConnectResponse::Client(ClientResponse::new(
188 head, payload,
189 ))))
190 }
191
192 ConnectRequestProj::Tunnel { fut } => {
193 let (head, framed) = ready!(fut.as_mut().poll(cx))?;
194 let framed = framed.into_map_io(|io| Box::new(io) as _);
195 Poll::Ready(Ok(ConnectResponse::Tunnel(head, framed)))
196 }
197 }
198 }
199}