awc/
connect.rs

1use std::{
2    future::Future,
3    net,
4    pin::Pin,
5    rc::Rc,
6    task::{Context, Poll},
7};
8
9use actix_codec::Framed;
10use actix_http::{h1::ClientCodec, Payload, RequestHead, RequestHeadType, ResponseHead};
11use actix_service::Service;
12use futures_core::{future::LocalBoxFuture, ready};
13
14use crate::{
15    any_body::AnyBody,
16    client::{
17        Connect as ClientConnect, ConnectError, Connection, ConnectionIo, SendRequestError,
18    },
19    ClientResponse,
20};
21
22pub type BoxConnectorService = Rc<
23    dyn Service<
24        ConnectRequest,
25        Response = ConnectResponse,
26        Error = SendRequestError,
27        Future = LocalBoxFuture<'static, Result<ConnectResponse, SendRequestError>>,
28    >,
29>;
30
31pub type BoxedSocket = Box<dyn ConnectionIo>;
32
33/// Combined HTTP and WebSocket request type received by connection service.
34pub enum ConnectRequest {
35    /// Standard HTTP request.
36    ///
37    /// Contains the request head, body type, and optional pre-resolved socket address.
38    Client(RequestHeadType, AnyBody, Option<net::SocketAddr>),
39
40    /// Tunnel used by WebSocket connection requests.
41    ///
42    /// Contains the request head and optional pre-resolved socket address.
43    Tunnel(RequestHead, Option<net::SocketAddr>),
44}
45
46/// Combined HTTP response & WebSocket tunnel type returned from connection service.
47pub enum ConnectResponse {
48    /// Standard HTTP response.
49    Client(ClientResponse),
50
51    /// Tunnel used for WebSocket communication.
52    ///
53    /// Contains response head and framed HTTP/1.1 codec.
54    Tunnel(ResponseHead, Framed<BoxedSocket, ClientCodec>),
55}
56
57impl ConnectResponse {
58    /// Unwraps type into HTTP response.
59    ///
60    /// # Panics
61    /// Panics if enum variant is not `Client`.
62    pub fn into_client_response(self) -> ClientResponse {
63        match self {
64            ConnectResponse::Client(res) => res,
65            _ => panic!(
66                "ClientResponse only reachable with ConnectResponse::ClientResponse variant"
67            ),
68        }
69    }
70
71    /// Unwraps type into WebSocket tunnel response.
72    ///
73    /// # Panics
74    /// Panics if enum variant is not `Tunnel`.
75    pub fn into_tunnel_response(self) -> (ResponseHead, Framed<BoxedSocket, ClientCodec>) {
76        match self {
77            ConnectResponse::Tunnel(head, framed) => (head, framed),
78            _ => panic!(
79                "TunnelResponse only reachable with ConnectResponse::TunnelResponse variant"
80            ),
81        }
82    }
83}
84
85pub struct DefaultConnector<S> {
86    connector: S,
87}
88
89impl<S> DefaultConnector<S> {
90    pub(crate) fn new(connector: S) -> Self {
91        Self { connector }
92    }
93}
94
95impl<S, Io> Service<ConnectRequest> for DefaultConnector<S>
96where
97    S: Service<ClientConnect, Error = ConnectError, Response = Connection<Io>>,
98    Io: ConnectionIo,
99{
100    type Response = ConnectResponse;
101    type Error = SendRequestError;
102    type Future = ConnectRequestFuture<S::Future, Io>;
103
104    actix_service::forward_ready!(connector);
105
106    fn call(&self, req: ConnectRequest) -> Self::Future {
107        // connect to the host
108        let fut = match req {
109            ConnectRequest::Client(ref head, .., addr) => self.connector.call(ClientConnect {
110                uri: head.as_ref().uri.clone(),
111                addr,
112            }),
113            ConnectRequest::Tunnel(ref head, addr) => self.connector.call(ClientConnect {
114                uri: head.uri.clone(),
115                addr,
116            }),
117        };
118
119        ConnectRequestFuture::Connection {
120            fut,
121            req: Some(req),
122        }
123    }
124}
125
126pin_project_lite::pin_project! {
127    #[project = ConnectRequestProj]
128    pub enum ConnectRequestFuture<Fut, Io>
129    where
130        Io: ConnectionIo
131    {
132        Connection {
133            #[pin]
134            fut: Fut,
135            req: Option<ConnectRequest>
136        },
137        Client {
138            fut: LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>>
139        },
140        Tunnel {
141            fut: LocalBoxFuture<
142                'static,
143                Result<(ResponseHead, Framed<Connection<Io>, ClientCodec>), SendRequestError>,
144            >,
145        }
146    }
147}
148
149impl<Fut, Io> Future for ConnectRequestFuture<Fut, Io>
150where
151    Fut: Future<Output = Result<Connection<Io>, ConnectError>>,
152    Io: ConnectionIo,
153{
154    type Output = Result<ConnectResponse, SendRequestError>;
155
156    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
157        match self.as_mut().project() {
158            ConnectRequestProj::Connection { fut, req } => {
159                let connection = ready!(fut.poll(cx))?;
160                let req = req.take().unwrap();
161
162                match req {
163                    ConnectRequest::Client(head, body, ..) => {
164                        // send request
165                        let fut = ConnectRequestFuture::Client {
166                            fut: connection.send_request(head, body),
167                        };
168
169                        self.set(fut);
170                    }
171
172                    ConnectRequest::Tunnel(head, ..) => {
173                        // send request
174                        let fut = ConnectRequestFuture::Tunnel {
175                            fut: connection.open_tunnel(RequestHeadType::from(head)),
176                        };
177
178                        self.set(fut);
179                    }
180                }
181
182                self.poll(cx)
183            }
184
185            ConnectRequestProj::Client { fut } => {
186                let (head, payload) = ready!(fut.as_mut().poll(cx))?;
187                Poll::Ready(Ok(ConnectResponse::Client(ClientResponse::new(
188                    head, payload,
189                ))))
190            }
191
192            ConnectRequestProj::Tunnel { fut } => {
193                let (head, framed) = ready!(fut.as_mut().poll(cx))?;
194                let framed = framed.into_map_io(|io| Box::new(io) as _);
195                Poll::Ready(Ok(ConnectResponse::Tunnel(head, framed)))
196            }
197        }
198    }
199}