actix_tls/connect/
connector.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    task::{Context, Poll},
5};
6
7use actix_rt::net::TcpStream;
8use actix_service::{Service, ServiceFactory};
9use actix_utils::future::{ok, Ready};
10use futures_core::ready;
11
12use super::{
13    error::ConnectError,
14    resolver::{Resolver, ResolverService},
15    tcp::{TcpConnector, TcpConnectorService},
16    ConnectInfo, Connection, Host,
17};
18
19/// Combined resolver and TCP connector service factory.
20///
21/// Used to create [`ConnectorService`]s which receive connection information, resolve DNS if
22/// required, and return a TCP stream.
23#[derive(Clone, Default)]
24pub struct Connector {
25    resolver: Resolver,
26}
27
28impl Connector {
29    /// Constructs new connector factory with the given resolver.
30    pub fn new(resolver: Resolver) -> Self {
31        Connector { resolver }
32    }
33
34    /// Build connector service.
35    pub fn service(&self) -> ConnectorService {
36        ConnectorService {
37            tcp: TcpConnector::default().service(),
38            resolver: self.resolver.service(),
39        }
40    }
41}
42
43impl<R: Host> ServiceFactory<ConnectInfo<R>> for Connector {
44    type Response = Connection<R, TcpStream>;
45    type Error = ConnectError;
46    type Config = ();
47    type Service = ConnectorService;
48    type InitError = ();
49    type Future = Ready<Result<Self::Service, Self::InitError>>;
50
51    fn new_service(&self, _: ()) -> Self::Future {
52        ok(self.service())
53    }
54}
55
56/// Combined resolver and TCP connector service.
57///
58/// Service implementation receives connection information, resolves DNS if required, and returns
59/// a TCP stream.
60#[derive(Clone, Default)]
61pub struct ConnectorService {
62    tcp: TcpConnectorService,
63    resolver: ResolverService,
64}
65
66impl<R: Host> Service<ConnectInfo<R>> for ConnectorService {
67    type Response = Connection<R, TcpStream>;
68    type Error = ConnectError;
69    type Future = ConnectServiceResponse<R>;
70
71    actix_service::always_ready!();
72
73    fn call(&self, req: ConnectInfo<R>) -> Self::Future {
74        ConnectServiceResponse {
75            fut: ConnectFut::Resolve(self.resolver.call(req)),
76            tcp: self.tcp,
77        }
78    }
79}
80
81/// Chains futures of resolve and connect steps.
82pub(crate) enum ConnectFut<R: Host> {
83    Resolve(<ResolverService as Service<ConnectInfo<R>>>::Future),
84    Connect(<TcpConnectorService as Service<ConnectInfo<R>>>::Future),
85}
86
87/// Container for the intermediate states of [`ConnectFut`].
88pub(crate) enum ConnectFutState<R: Host> {
89    Resolved(ConnectInfo<R>),
90    Connected(Connection<R, TcpStream>),
91}
92
93impl<R: Host> ConnectFut<R> {
94    fn poll_connect(
95        &mut self,
96        cx: &mut Context<'_>,
97    ) -> Poll<Result<ConnectFutState<R>, ConnectError>> {
98        match self {
99            ConnectFut::Resolve(ref mut fut) => {
100                Pin::new(fut).poll(cx).map_ok(ConnectFutState::Resolved)
101            }
102
103            ConnectFut::Connect(ref mut fut) => {
104                Pin::new(fut).poll(cx).map_ok(ConnectFutState::Connected)
105            }
106        }
107    }
108}
109
110pub struct ConnectServiceResponse<R: Host> {
111    fut: ConnectFut<R>,
112    tcp: TcpConnectorService,
113}
114
115impl<R: Host> Future for ConnectServiceResponse<R> {
116    type Output = Result<Connection<R, TcpStream>, ConnectError>;
117
118    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
119        loop {
120            match ready!(self.fut.poll_connect(cx))? {
121                ConnectFutState::Resolved(res) => {
122                    self.fut = ConnectFut::Connect(self.tcp.call(res));
123                }
124                ConnectFutState::Connected(res) => return Poll::Ready(Ok(res)),
125            }
126        }
127    }
128}