actix_connect/
connector.rs1use std::collections::VecDeque;
2use std::future::Future;
3use std::io;
4use std::marker::PhantomData;
5use std::net::SocketAddr;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use actix_rt::net::TcpStream;
10use actix_service::{Service, ServiceFactory};
11use futures_util::future::{err, ok, BoxFuture, Either, FutureExt, Ready};
12
13use super::connect::{Address, Connect, Connection};
14use super::error::ConnectError;
15
16#[derive(Debug)]
18pub struct TcpConnectorFactory<T>(PhantomData<T>);
19
20impl<T> TcpConnectorFactory<T> {
21 pub fn new() -> Self {
22 TcpConnectorFactory(PhantomData)
23 }
24
25 pub fn service(&self) -> TcpConnector<T> {
27 TcpConnector(PhantomData)
28 }
29}
30
31impl<T> Default for TcpConnectorFactory<T> {
32 fn default() -> Self {
33 TcpConnectorFactory(PhantomData)
34 }
35}
36
37impl<T> Clone for TcpConnectorFactory<T> {
38 fn clone(&self) -> Self {
39 TcpConnectorFactory(PhantomData)
40 }
41}
42
43impl<T: Address> ServiceFactory for TcpConnectorFactory<T> {
44 type Request = Connect<T>;
45 type Response = Connection<T, TcpStream>;
46 type Error = ConnectError;
47 type Config = ();
48 type Service = TcpConnector<T>;
49 type InitError = ();
50 type Future = Ready<Result<Self::Service, Self::InitError>>;
51
52 fn new_service(&self, _: ()) -> Self::Future {
53 ok(self.service())
54 }
55}
56
57#[derive(Default, Debug)]
59pub struct TcpConnector<T>(PhantomData<T>);
60
61impl<T> TcpConnector<T> {
62 pub fn new() -> Self {
63 TcpConnector(PhantomData)
64 }
65}
66
67impl<T> Clone for TcpConnector<T> {
68 fn clone(&self) -> Self {
69 TcpConnector(PhantomData)
70 }
71}
72
73impl<T: Address> Service for TcpConnector<T> {
74 type Request = Connect<T>;
75 type Response = Connection<T, TcpStream>;
76 type Error = ConnectError;
77 #[allow(clippy::type_complexity)]
78 type Future = Either<TcpConnectorResponse<T>, Ready<Result<Self::Response, Self::Error>>>;
79
80 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
81 Poll::Ready(Ok(()))
82 }
83
84 fn call(&mut self, req: Connect<T>) -> Self::Future {
85 let port = req.port();
86 let Connect { req, addr, .. } = req;
87
88 if let Some(addr) = addr {
89 Either::Left(TcpConnectorResponse::new(req, port, addr))
90 } else {
91 error!("TCP connector: got unresolved address");
92 Either::Right(err(ConnectError::Unresolved))
93 }
94 }
95}
96
97#[doc(hidden)]
98pub struct TcpConnectorResponse<T> {
100 req: Option<T>,
101 port: u16,
102 addrs: Option<VecDeque<SocketAddr>>,
103 stream: Option<BoxFuture<'static, Result<TcpStream, io::Error>>>,
104}
105
106impl<T: Address> TcpConnectorResponse<T> {
107 pub fn new(
108 req: T,
109 port: u16,
110 addr: either::Either<SocketAddr, VecDeque<SocketAddr>>,
111 ) -> TcpConnectorResponse<T> {
112 trace!(
113 "TCP connector - connecting to {:?} port:{}",
114 req.host(),
115 port
116 );
117
118 match addr {
119 either::Either::Left(addr) => TcpConnectorResponse {
120 req: Some(req),
121 port,
122 addrs: None,
123 stream: Some(TcpStream::connect(addr).boxed()),
124 },
125 either::Either::Right(addrs) => TcpConnectorResponse {
126 req: Some(req),
127 port,
128 addrs: Some(addrs),
129 stream: None,
130 },
131 }
132 }
133}
134
135impl<T: Address> Future for TcpConnectorResponse<T> {
136 type Output = Result<Connection<T, TcpStream>, ConnectError>;
137
138 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
139 let this = self.get_mut();
140
141 loop {
143 if let Some(new) = this.stream.as_mut() {
144 match new.as_mut().poll(cx) {
145 Poll::Ready(Ok(sock)) => {
146 let req = this.req.take().unwrap();
147 trace!(
148 "TCP connector - successfully connected to connecting to {:?} - {:?}",
149 req.host(), sock.peer_addr()
150 );
151 return Poll::Ready(Ok(Connection::new(sock, req)));
152 }
153 Poll::Pending => return Poll::Pending,
154 Poll::Ready(Err(err)) => {
155 trace!(
156 "TCP connector - failed to connect to connecting to {:?} port: {}",
157 this.req.as_ref().unwrap().host(),
158 this.port,
159 );
160 if this.addrs.is_none() || this.addrs.as_ref().unwrap().is_empty() {
161 return Poll::Ready(Err(err.into()));
162 }
163 }
164 }
165 }
166
167 let addr = this.addrs.as_mut().unwrap().pop_front().unwrap();
169 this.stream = Some(TcpStream::connect(addr).boxed());
170 }
171 }
172}