1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use actix_rt::net::TcpStream;
6use actix_service::{Service, ServiceFactory};
7use either::Either;
8use futures_util::future::{ok, Ready};
9use trust_dns_resolver::TokioAsyncResolver as AsyncResolver;
10
11use crate::connect::{Address, Connect, Connection};
12use crate::connector::{TcpConnector, TcpConnectorFactory};
13use crate::error::ConnectError;
14use crate::resolve::{Resolver, ResolverFactory};
15
16pub struct ConnectServiceFactory<T> {
17 tcp: TcpConnectorFactory<T>,
18 resolver: ResolverFactory<T>,
19}
20
21impl<T> ConnectServiceFactory<T> {
22 pub fn new() -> Self {
24 ConnectServiceFactory {
25 tcp: TcpConnectorFactory::default(),
26 resolver: ResolverFactory::default(),
27 }
28 }
29
30 pub fn with_resolver(resolver: AsyncResolver) -> Self {
32 ConnectServiceFactory {
33 tcp: TcpConnectorFactory::default(),
34 resolver: ResolverFactory::new(resolver),
35 }
36 }
37
38 pub fn service(&self) -> ConnectService<T> {
40 ConnectService {
41 tcp: self.tcp.service(),
42 resolver: self.resolver.service(),
43 }
44 }
45
46 pub fn tcp_service(&self) -> TcpConnectService<T> {
48 TcpConnectService {
49 tcp: self.tcp.service(),
50 resolver: self.resolver.service(),
51 }
52 }
53}
54
55impl<T> Default for ConnectServiceFactory<T> {
56 fn default() -> Self {
57 ConnectServiceFactory {
58 tcp: TcpConnectorFactory::default(),
59 resolver: ResolverFactory::default(),
60 }
61 }
62}
63
64impl<T> Clone for ConnectServiceFactory<T> {
65 fn clone(&self) -> Self {
66 ConnectServiceFactory {
67 tcp: self.tcp.clone(),
68 resolver: self.resolver.clone(),
69 }
70 }
71}
72
73impl<T: Address> ServiceFactory for ConnectServiceFactory<T> {
74 type Request = Connect<T>;
75 type Response = Connection<T, TcpStream>;
76 type Error = ConnectError;
77 type Config = ();
78 type Service = ConnectService<T>;
79 type InitError = ();
80 type Future = Ready<Result<Self::Service, Self::InitError>>;
81
82 fn new_service(&self, _: ()) -> Self::Future {
83 ok(self.service())
84 }
85}
86
87#[derive(Clone)]
88pub struct ConnectService<T> {
89 tcp: TcpConnector<T>,
90 resolver: Resolver<T>,
91}
92
93impl<T: Address> Service for ConnectService<T> {
94 type Request = Connect<T>;
95 type Response = Connection<T, TcpStream>;
96 type Error = ConnectError;
97 type Future = ConnectServiceResponse<T>;
98
99 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
100 Poll::Ready(Ok(()))
101 }
102
103 fn call(&mut self, req: Connect<T>) -> Self::Future {
104 ConnectServiceResponse {
105 state: ConnectState::Resolve(self.resolver.call(req)),
106 tcp: self.tcp.clone(),
107 }
108 }
109}
110
111enum ConnectState<T: Address> {
112 Resolve(<Resolver<T> as Service>::Future),
113 Connect(<TcpConnector<T> as Service>::Future),
114}
115
116impl<T: Address> ConnectState<T> {
117 #[allow(clippy::type_complexity)]
118 fn poll(
119 &mut self,
120 cx: &mut Context<'_>,
121 ) -> Either<Poll<Result<Connection<T, TcpStream>, ConnectError>>, Connect<T>> {
122 match self {
123 ConnectState::Resolve(ref mut fut) => match Pin::new(fut).poll(cx) {
124 Poll::Pending => Either::Left(Poll::Pending),
125 Poll::Ready(Ok(res)) => Either::Right(res),
126 Poll::Ready(Err(err)) => Either::Left(Poll::Ready(Err(err))),
127 },
128 ConnectState::Connect(ref mut fut) => Either::Left(Pin::new(fut).poll(cx)),
129 }
130 }
131}
132
133pub struct ConnectServiceResponse<T: Address> {
134 state: ConnectState<T>,
135 tcp: TcpConnector<T>,
136}
137
138impl<T: Address> Future for ConnectServiceResponse<T> {
139 type Output = Result<Connection<T, TcpStream>, ConnectError>;
140
141 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
142 let res = match self.state.poll(cx) {
143 Either::Right(res) => {
144 self.state = ConnectState::Connect(self.tcp.call(res));
145 self.state.poll(cx)
146 }
147 Either::Left(res) => return res,
148 };
149
150 match res {
151 Either::Left(res) => res,
152 Either::Right(_) => panic!(),
153 }
154 }
155}
156
157#[derive(Clone)]
158pub struct TcpConnectService<T> {
159 tcp: TcpConnector<T>,
160 resolver: Resolver<T>,
161}
162
163impl<T: Address + 'static> Service for TcpConnectService<T> {
164 type Request = Connect<T>;
165 type Response = TcpStream;
166 type Error = ConnectError;
167 type Future = TcpConnectServiceResponse<T>;
168
169 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
170 Poll::Ready(Ok(()))
171 }
172
173 fn call(&mut self, req: Connect<T>) -> Self::Future {
174 TcpConnectServiceResponse {
175 state: TcpConnectState::Resolve(self.resolver.call(req)),
176 tcp: self.tcp.clone(),
177 }
178 }
179}
180
181enum TcpConnectState<T: Address> {
182 Resolve(<Resolver<T> as Service>::Future),
183 Connect(<TcpConnector<T> as Service>::Future),
184}
185
186impl<T: Address> TcpConnectState<T> {
187 fn poll(
188 &mut self,
189 cx: &mut Context<'_>,
190 ) -> Either<Poll<Result<TcpStream, ConnectError>>, Connect<T>> {
191 match self {
192 TcpConnectState::Resolve(ref mut fut) => match Pin::new(fut).poll(cx) {
193 Poll::Pending => (),
194 Poll::Ready(Ok(res)) => return Either::Right(res),
195 Poll::Ready(Err(err)) => return Either::Left(Poll::Ready(Err(err))),
196 },
197 TcpConnectState::Connect(ref mut fut) => {
198 if let Poll::Ready(res) = Pin::new(fut).poll(cx) {
199 return match res {
200 Ok(conn) => Either::Left(Poll::Ready(Ok(conn.into_parts().0))),
201 Err(err) => Either::Left(Poll::Ready(Err(err))),
202 };
203 }
204 }
205 }
206 Either::Left(Poll::Pending)
207 }
208}
209
210pub struct TcpConnectServiceResponse<T: Address> {
211 state: TcpConnectState<T>,
212 tcp: TcpConnector<T>,
213}
214
215impl<T: Address> Future for TcpConnectServiceResponse<T> {
216 type Output = Result<TcpStream, ConnectError>;
217
218 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
219 let res = match self.state.poll(cx) {
220 Either::Right(res) => {
221 self.state = TcpConnectState::Connect(self.tcp.call(res));
222 self.state.poll(cx)
223 }
224 Either::Left(res) => return res,
225 };
226
227 match res {
228 Either::Left(res) => res,
229 Either::Right(_) => panic!(),
230 }
231 }
232}