actix_connect/
service.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use actix_rt::net::TcpStream;
6use actix_service::{Service, ServiceFactory};
7use either::Either;
8use futures_util::future::{ok, Ready};
9use trust_dns_resolver::TokioAsyncResolver as AsyncResolver;
10
11use crate::connect::{Address, Connect, Connection};
12use crate::connector::{TcpConnector, TcpConnectorFactory};
13use crate::error::ConnectError;
14use crate::resolve::{Resolver, ResolverFactory};
15
16pub struct ConnectServiceFactory<T> {
17    tcp: TcpConnectorFactory<T>,
18    resolver: ResolverFactory<T>,
19}
20
21impl<T> ConnectServiceFactory<T> {
22    /// Construct new ConnectService factory
23    pub fn new() -> Self {
24        ConnectServiceFactory {
25            tcp: TcpConnectorFactory::default(),
26            resolver: ResolverFactory::default(),
27        }
28    }
29
30    /// Construct new connect service with custom dns resolver
31    pub fn with_resolver(resolver: AsyncResolver) -> Self {
32        ConnectServiceFactory {
33            tcp: TcpConnectorFactory::default(),
34            resolver: ResolverFactory::new(resolver),
35        }
36    }
37
38    /// Construct new service
39    pub fn service(&self) -> ConnectService<T> {
40        ConnectService {
41            tcp: self.tcp.service(),
42            resolver: self.resolver.service(),
43        }
44    }
45
46    /// Construct new tcp stream service
47    pub fn tcp_service(&self) -> TcpConnectService<T> {
48        TcpConnectService {
49            tcp: self.tcp.service(),
50            resolver: self.resolver.service(),
51        }
52    }
53}
54
55impl<T> Default for ConnectServiceFactory<T> {
56    fn default() -> Self {
57        ConnectServiceFactory {
58            tcp: TcpConnectorFactory::default(),
59            resolver: ResolverFactory::default(),
60        }
61    }
62}
63
64impl<T> Clone for ConnectServiceFactory<T> {
65    fn clone(&self) -> Self {
66        ConnectServiceFactory {
67            tcp: self.tcp.clone(),
68            resolver: self.resolver.clone(),
69        }
70    }
71}
72
73impl<T: Address> ServiceFactory for ConnectServiceFactory<T> {
74    type Request = Connect<T>;
75    type Response = Connection<T, TcpStream>;
76    type Error = ConnectError;
77    type Config = ();
78    type Service = ConnectService<T>;
79    type InitError = ();
80    type Future = Ready<Result<Self::Service, Self::InitError>>;
81
82    fn new_service(&self, _: ()) -> Self::Future {
83        ok(self.service())
84    }
85}
86
87#[derive(Clone)]
88pub struct ConnectService<T> {
89    tcp: TcpConnector<T>,
90    resolver: Resolver<T>,
91}
92
93impl<T: Address> Service for ConnectService<T> {
94    type Request = Connect<T>;
95    type Response = Connection<T, TcpStream>;
96    type Error = ConnectError;
97    type Future = ConnectServiceResponse<T>;
98
99    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
100        Poll::Ready(Ok(()))
101    }
102
103    fn call(&mut self, req: Connect<T>) -> Self::Future {
104        ConnectServiceResponse {
105            state: ConnectState::Resolve(self.resolver.call(req)),
106            tcp: self.tcp.clone(),
107        }
108    }
109}
110
111enum ConnectState<T: Address> {
112    Resolve(<Resolver<T> as Service>::Future),
113    Connect(<TcpConnector<T> as Service>::Future),
114}
115
116impl<T: Address> ConnectState<T> {
117    #[allow(clippy::type_complexity)]
118    fn poll(
119        &mut self,
120        cx: &mut Context<'_>,
121    ) -> Either<Poll<Result<Connection<T, TcpStream>, ConnectError>>, Connect<T>> {
122        match self {
123            ConnectState::Resolve(ref mut fut) => match Pin::new(fut).poll(cx) {
124                Poll::Pending => Either::Left(Poll::Pending),
125                Poll::Ready(Ok(res)) => Either::Right(res),
126                Poll::Ready(Err(err)) => Either::Left(Poll::Ready(Err(err))),
127            },
128            ConnectState::Connect(ref mut fut) => Either::Left(Pin::new(fut).poll(cx)),
129        }
130    }
131}
132
133pub struct ConnectServiceResponse<T: Address> {
134    state: ConnectState<T>,
135    tcp: TcpConnector<T>,
136}
137
138impl<T: Address> Future for ConnectServiceResponse<T> {
139    type Output = Result<Connection<T, TcpStream>, ConnectError>;
140
141    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
142        let res = match self.state.poll(cx) {
143            Either::Right(res) => {
144                self.state = ConnectState::Connect(self.tcp.call(res));
145                self.state.poll(cx)
146            }
147            Either::Left(res) => return res,
148        };
149
150        match res {
151            Either::Left(res) => res,
152            Either::Right(_) => panic!(),
153        }
154    }
155}
156
157#[derive(Clone)]
158pub struct TcpConnectService<T> {
159    tcp: TcpConnector<T>,
160    resolver: Resolver<T>,
161}
162
163impl<T: Address + 'static> Service for TcpConnectService<T> {
164    type Request = Connect<T>;
165    type Response = TcpStream;
166    type Error = ConnectError;
167    type Future = TcpConnectServiceResponse<T>;
168
169    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
170        Poll::Ready(Ok(()))
171    }
172
173    fn call(&mut self, req: Connect<T>) -> Self::Future {
174        TcpConnectServiceResponse {
175            state: TcpConnectState::Resolve(self.resolver.call(req)),
176            tcp: self.tcp.clone(),
177        }
178    }
179}
180
181enum TcpConnectState<T: Address> {
182    Resolve(<Resolver<T> as Service>::Future),
183    Connect(<TcpConnector<T> as Service>::Future),
184}
185
186impl<T: Address> TcpConnectState<T> {
187    fn poll(
188        &mut self,
189        cx: &mut Context<'_>,
190    ) -> Either<Poll<Result<TcpStream, ConnectError>>, Connect<T>> {
191        match self {
192            TcpConnectState::Resolve(ref mut fut) => match Pin::new(fut).poll(cx) {
193                Poll::Pending => (),
194                Poll::Ready(Ok(res)) => return Either::Right(res),
195                Poll::Ready(Err(err)) => return Either::Left(Poll::Ready(Err(err))),
196            },
197            TcpConnectState::Connect(ref mut fut) => {
198                if let Poll::Ready(res) = Pin::new(fut).poll(cx) {
199                    return match res {
200                        Ok(conn) => Either::Left(Poll::Ready(Ok(conn.into_parts().0))),
201                        Err(err) => Either::Left(Poll::Ready(Err(err))),
202                    };
203                }
204            }
205        }
206        Either::Left(Poll::Pending)
207    }
208}
209
210pub struct TcpConnectServiceResponse<T: Address> {
211    state: TcpConnectState<T>,
212    tcp: TcpConnector<T>,
213}
214
215impl<T: Address> Future for TcpConnectServiceResponse<T> {
216    type Output = Result<TcpStream, ConnectError>;
217
218    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
219        let res = match self.state.poll(cx) {
220            Either::Right(res) => {
221                self.state = TcpConnectState::Connect(self.tcp.call(res));
222                self.state.poll(cx)
223            }
224            Either::Left(res) => return res,
225        };
226
227        match res {
228            Either::Left(res) => res,
229            Either::Right(_) => panic!(),
230        }
231    }
232}