actori_connect/
service.rs1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use actori_rt::net::TcpStream;
6use actori_service::{Service, ServiceFactory};
7use either::Either;
8use futures::future::{ok, Ready};
9use trust_dns_resolver::AsyncResolver;
10
11use crate::connect::{Address, Connect, Connection};
12use crate::connector::{TcpConnector, TcpConnectorFactory};
13use crate::error::ConnectError;
14use crate::resolve::{Resolver, ResolverFactory};
15
16pub struct ConnectServiceFactory<T> {
17 tcp: TcpConnectorFactory<T>,
18 resolver: ResolverFactory<T>,
19}
20
21impl<T> ConnectServiceFactory<T> {
22 pub fn new() -> Self {
24 ConnectServiceFactory {
25 tcp: TcpConnectorFactory::default(),
26 resolver: ResolverFactory::default(),
27 }
28 }
29
30 pub fn with_resolver(resolver: AsyncResolver) -> Self {
32 ConnectServiceFactory {
33 tcp: TcpConnectorFactory::default(),
34 resolver: ResolverFactory::new(resolver),
35 }
36 }
37
38 pub fn service(&self) -> ConnectService<T> {
40 ConnectService {
41 tcp: self.tcp.service(),
42 resolver: self.resolver.service(),
43 }
44 }
45
46 pub fn tcp_service(&self) -> TcpConnectService<T> {
48 TcpConnectService {
49 tcp: self.tcp.service(),
50 resolver: self.resolver.service(),
51 }
52 }
53}
54
55impl<T> Default for ConnectServiceFactory<T> {
56 fn default() -> Self {
57 ConnectServiceFactory {
58 tcp: TcpConnectorFactory::default(),
59 resolver: ResolverFactory::default(),
60 }
61 }
62}
63
64impl<T> Clone for ConnectServiceFactory<T> {
65 fn clone(&self) -> Self {
66 ConnectServiceFactory {
67 tcp: self.tcp.clone(),
68 resolver: self.resolver.clone(),
69 }
70 }
71}
72
73impl<T: Address> ServiceFactory for ConnectServiceFactory<T> {
74 type Request = Connect<T>;
75 type Response = Connection<T, TcpStream>;
76 type Error = ConnectError;
77 type Config = ();
78 type Service = ConnectService<T>;
79 type InitError = ();
80 type Future = Ready<Result<Self::Service, Self::InitError>>;
81
82 fn new_service(&self, _: ()) -> Self::Future {
83 ok(self.service())
84 }
85}
86
87#[derive(Clone)]
88pub struct ConnectService<T> {
89 tcp: TcpConnector<T>,
90 resolver: Resolver<T>,
91}
92
93impl<T: Address> Service for ConnectService<T> {
94 type Request = Connect<T>;
95 type Response = Connection<T, TcpStream>;
96 type Error = ConnectError;
97 type Future = ConnectServiceResponse<T>;
98
99 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
100 Poll::Ready(Ok(()))
101 }
102
103 fn call(&mut self, req: Connect<T>) -> Self::Future {
104 ConnectServiceResponse {
105 state: ConnectState::Resolve(self.resolver.call(req)),
106 tcp: self.tcp.clone(),
107 }
108 }
109}
110
111enum ConnectState<T: Address> {
112 Resolve(<Resolver<T> as Service>::Future),
113 Connect(<TcpConnector<T> as Service>::Future),
114}
115
116impl<T: Address> ConnectState<T> {
117 fn poll(
118 &mut self,
119 cx: &mut Context<'_>,
120 ) -> Either<Poll<Result<Connection<T, TcpStream>, ConnectError>>, Connect<T>> {
121 match self {
122 ConnectState::Resolve(ref mut fut) => match Pin::new(fut).poll(cx) {
123 Poll::Pending => Either::Left(Poll::Pending),
124 Poll::Ready(Ok(res)) => Either::Right(res),
125 Poll::Ready(Err(err)) => Either::Left(Poll::Ready(Err(err))),
126 },
127 ConnectState::Connect(ref mut fut) => Either::Left(Pin::new(fut).poll(cx)),
128 }
129 }
130}
131
132pub struct ConnectServiceResponse<T: Address> {
133 state: ConnectState<T>,
134 tcp: TcpConnector<T>,
135}
136
137impl<T: Address> Future for ConnectServiceResponse<T> {
138 type Output = Result<Connection<T, TcpStream>, ConnectError>;
139
140 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
141 let res = match self.state.poll(cx) {
142 Either::Right(res) => {
143 self.state = ConnectState::Connect(self.tcp.call(res));
144 self.state.poll(cx)
145 }
146 Either::Left(res) => return res,
147 };
148
149 match res {
150 Either::Left(res) => res,
151 Either::Right(_) => panic!(),
152 }
153 }
154}
155
156#[derive(Clone)]
157pub struct TcpConnectService<T> {
158 tcp: TcpConnector<T>,
159 resolver: Resolver<T>,
160}
161
162impl<T: Address + 'static> Service for TcpConnectService<T> {
163 type Request = Connect<T>;
164 type Response = TcpStream;
165 type Error = ConnectError;
166 type Future = TcpConnectServiceResponse<T>;
167
168 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
169 Poll::Ready(Ok(()))
170 }
171
172 fn call(&mut self, req: Connect<T>) -> Self::Future {
173 TcpConnectServiceResponse {
174 state: TcpConnectState::Resolve(self.resolver.call(req)),
175 tcp: self.tcp.clone(),
176 }
177 }
178}
179
180enum TcpConnectState<T: Address> {
181 Resolve(<Resolver<T> as Service>::Future),
182 Connect(<TcpConnector<T> as Service>::Future),
183}
184
185impl<T: Address> TcpConnectState<T> {
186 fn poll(
187 &mut self,
188 cx: &mut Context<'_>,
189 ) -> Either<Poll<Result<TcpStream, ConnectError>>, Connect<T>> {
190 match self {
191 TcpConnectState::Resolve(ref mut fut) => match Pin::new(fut).poll(cx) {
192 Poll::Pending => (),
193 Poll::Ready(Ok(res)) => return Either::Right(res),
194 Poll::Ready(Err(err)) => return Either::Left(Poll::Ready(Err(err))),
195 },
196 TcpConnectState::Connect(ref mut fut) => {
197 if let Poll::Ready(res) = Pin::new(fut).poll(cx) {
198 return match res {
199 Ok(conn) => Either::Left(Poll::Ready(Ok(conn.into_parts().0))),
200 Err(err) => Either::Left(Poll::Ready(Err(err))),
201 };
202 }
203 }
204 }
205 Either::Left(Poll::Pending)
206 }
207}
208
209pub struct TcpConnectServiceResponse<T: Address> {
210 state: TcpConnectState<T>,
211 tcp: TcpConnector<T>,
212}
213
214impl<T: Address> Future for TcpConnectServiceResponse<T> {
215 type Output = Result<TcpStream, ConnectError>;
216
217 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
218 let res = match self.state.poll(cx) {
219 Either::Right(res) => {
220 self.state = TcpConnectState::Connect(self.tcp.call(res));
221 self.state.poll(cx)
222 }
223 Either::Left(res) => return res,
224 };
225
226 match res {
227 Either::Left(res) => res,
228 Either::Right(_) => panic!(),
229 }
230 }
231}