actix_connect/
connector.rs

1use std::collections::VecDeque;
2use std::future::Future;
3use std::io;
4use std::marker::PhantomData;
5use std::net::SocketAddr;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use actix_rt::net::TcpStream;
10use actix_service::{Service, ServiceFactory};
11use futures_util::future::{err, ok, BoxFuture, Either, FutureExt, Ready};
12
13use super::connect::{Address, Connect, Connection};
14use super::error::ConnectError;
15
16/// TCP connector service factory
17#[derive(Debug)]
18pub struct TcpConnectorFactory<T>(PhantomData<T>);
19
20impl<T> TcpConnectorFactory<T> {
21    pub fn new() -> Self {
22        TcpConnectorFactory(PhantomData)
23    }
24
25    /// Create TCP connector service
26    pub fn service(&self) -> TcpConnector<T> {
27        TcpConnector(PhantomData)
28    }
29}
30
31impl<T> Default for TcpConnectorFactory<T> {
32    fn default() -> Self {
33        TcpConnectorFactory(PhantomData)
34    }
35}
36
37impl<T> Clone for TcpConnectorFactory<T> {
38    fn clone(&self) -> Self {
39        TcpConnectorFactory(PhantomData)
40    }
41}
42
43impl<T: Address> ServiceFactory for TcpConnectorFactory<T> {
44    type Request = Connect<T>;
45    type Response = Connection<T, TcpStream>;
46    type Error = ConnectError;
47    type Config = ();
48    type Service = TcpConnector<T>;
49    type InitError = ();
50    type Future = Ready<Result<Self::Service, Self::InitError>>;
51
52    fn new_service(&self, _: ()) -> Self::Future {
53        ok(self.service())
54    }
55}
56
57/// TCP connector service
58#[derive(Default, Debug)]
59pub struct TcpConnector<T>(PhantomData<T>);
60
61impl<T> TcpConnector<T> {
62    pub fn new() -> Self {
63        TcpConnector(PhantomData)
64    }
65}
66
67impl<T> Clone for TcpConnector<T> {
68    fn clone(&self) -> Self {
69        TcpConnector(PhantomData)
70    }
71}
72
73impl<T: Address> Service for TcpConnector<T> {
74    type Request = Connect<T>;
75    type Response = Connection<T, TcpStream>;
76    type Error = ConnectError;
77    #[allow(clippy::type_complexity)]
78    type Future = Either<TcpConnectorResponse<T>, Ready<Result<Self::Response, Self::Error>>>;
79
80    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
81        Poll::Ready(Ok(()))
82    }
83
84    fn call(&mut self, req: Connect<T>) -> Self::Future {
85        let port = req.port();
86        let Connect { req, addr, .. } = req;
87
88        if let Some(addr) = addr {
89            Either::Left(TcpConnectorResponse::new(req, port, addr))
90        } else {
91            error!("TCP connector: got unresolved address");
92            Either::Right(err(ConnectError::Unresolved))
93        }
94    }
95}
96
97#[doc(hidden)]
98/// TCP stream connector response future
99pub struct TcpConnectorResponse<T> {
100    req: Option<T>,
101    port: u16,
102    addrs: Option<VecDeque<SocketAddr>>,
103    stream: Option<BoxFuture<'static, Result<TcpStream, io::Error>>>,
104}
105
106impl<T: Address> TcpConnectorResponse<T> {
107    pub fn new(
108        req: T,
109        port: u16,
110        addr: either::Either<SocketAddr, VecDeque<SocketAddr>>,
111    ) -> TcpConnectorResponse<T> {
112        trace!(
113            "TCP connector - connecting to {:?} port:{}",
114            req.host(),
115            port
116        );
117
118        match addr {
119            either::Either::Left(addr) => TcpConnectorResponse {
120                req: Some(req),
121                port,
122                addrs: None,
123                stream: Some(TcpStream::connect(addr).boxed()),
124            },
125            either::Either::Right(addrs) => TcpConnectorResponse {
126                req: Some(req),
127                port,
128                addrs: Some(addrs),
129                stream: None,
130            },
131        }
132    }
133}
134
135impl<T: Address> Future for TcpConnectorResponse<T> {
136    type Output = Result<Connection<T, TcpStream>, ConnectError>;
137
138    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
139        let this = self.get_mut();
140
141        // connect
142        loop {
143            if let Some(new) = this.stream.as_mut() {
144                match new.as_mut().poll(cx) {
145                    Poll::Ready(Ok(sock)) => {
146                        let req = this.req.take().unwrap();
147                        trace!(
148                            "TCP connector - successfully connected to connecting to {:?} - {:?}",
149                            req.host(), sock.peer_addr()
150                        );
151                        return Poll::Ready(Ok(Connection::new(sock, req)));
152                    }
153                    Poll::Pending => return Poll::Pending,
154                    Poll::Ready(Err(err)) => {
155                        trace!(
156                            "TCP connector - failed to connect to connecting to {:?} port: {}",
157                            this.req.as_ref().unwrap().host(),
158                            this.port,
159                        );
160                        if this.addrs.is_none() || this.addrs.as_ref().unwrap().is_empty() {
161                            return Poll::Ready(Err(err.into()));
162                        }
163                    }
164                }
165            }
166
167            // try to connect
168            let addr = this.addrs.as_mut().unwrap().pop_front().unwrap();
169            this.stream = Some(TcpStream::connect(addr).boxed());
170        }
171    }
172}