actix_connect/
connect.rs

1use std::collections::{vec_deque, VecDeque};
2use std::fmt;
3use std::iter::{FromIterator, FusedIterator};
4use std::net::SocketAddr;
5
6use either::Either;
7
8/// Connect request
9pub trait Address: Unpin + 'static {
10    /// Host name of the request
11    fn host(&self) -> &str;
12
13    /// Port of the request
14    fn port(&self) -> Option<u16>;
15}
16
17impl Address for String {
18    fn host(&self) -> &str {
19        &self
20    }
21
22    fn port(&self) -> Option<u16> {
23        None
24    }
25}
26
27impl Address for &'static str {
28    fn host(&self) -> &str {
29        self
30    }
31
32    fn port(&self) -> Option<u16> {
33        None
34    }
35}
36
37/// Connect request
38#[derive(Eq, PartialEq, Debug, Hash)]
39pub struct Connect<T> {
40    pub(crate) req: T,
41    pub(crate) port: u16,
42    pub(crate) addr: Option<Either<SocketAddr, VecDeque<SocketAddr>>>,
43}
44
45impl<T: Address> Connect<T> {
46    /// Create `Connect` instance by splitting the string by ':' and convert the second part to u16
47    pub fn new(req: T) -> Connect<T> {
48        let (_, port) = parse(req.host());
49        Connect {
50            req,
51            port: port.unwrap_or(0),
52            addr: None,
53        }
54    }
55
56    /// Create new `Connect` instance from host and address. Connector skips name resolution stage
57    /// for such connect messages.
58    pub fn with(req: T, addr: SocketAddr) -> Connect<T> {
59        Connect {
60            req,
61            port: 0,
62            addr: Some(Either::Left(addr)),
63        }
64    }
65
66    /// Use port if address does not provide one.
67    ///
68    /// By default it set to 0
69    pub fn set_port(mut self, port: u16) -> Self {
70        self.port = port;
71        self
72    }
73
74    /// Use address.
75    pub fn set_addr(mut self, addr: Option<SocketAddr>) -> Self {
76        if let Some(addr) = addr {
77            self.addr = Some(Either::Left(addr));
78        }
79        self
80    }
81
82    /// Use addresses.
83    pub fn set_addrs<I>(mut self, addrs: I) -> Self
84    where
85        I: IntoIterator<Item = SocketAddr>,
86    {
87        let mut addrs = VecDeque::from_iter(addrs);
88        self.addr = if addrs.len() < 2 {
89            addrs.pop_front().map(Either::Left)
90        } else {
91            Some(Either::Right(addrs))
92        };
93        self
94    }
95
96    /// Host name
97    pub fn host(&self) -> &str {
98        self.req.host()
99    }
100
101    /// Port of the request
102    pub fn port(&self) -> u16 {
103        self.req.port().unwrap_or(self.port)
104    }
105
106    /// Pre-resolved addresses of the request.
107    pub fn addrs(&self) -> ConnectAddrsIter<'_> {
108        let inner = match self.addr {
109            None => Either::Left(None),
110            Some(Either::Left(addr)) => Either::Left(Some(addr)),
111            Some(Either::Right(ref addrs)) => Either::Right(addrs.iter()),
112        };
113
114        ConnectAddrsIter { inner }
115    }
116
117    /// Takes pre-resolved addresses of the request.
118    pub fn take_addrs(&mut self) -> ConnectTakeAddrsIter {
119        let inner = match self.addr.take() {
120            None => Either::Left(None),
121            Some(Either::Left(addr)) => Either::Left(Some(addr)),
122            Some(Either::Right(addrs)) => Either::Right(addrs.into_iter()),
123        };
124
125        ConnectTakeAddrsIter { inner }
126    }
127}
128
129impl<T: Address> From<T> for Connect<T> {
130    fn from(addr: T) -> Self {
131        Connect::new(addr)
132    }
133}
134
135impl<T: Address> fmt::Display for Connect<T> {
136    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137        write!(f, "{}:{}", self.host(), self.port())
138    }
139}
140
141/// Iterator over addresses in a [`Connect`](struct.Connect.html) request.
142#[derive(Clone)]
143pub struct ConnectAddrsIter<'a> {
144    inner: Either<Option<SocketAddr>, vec_deque::Iter<'a, SocketAddr>>,
145}
146
147impl Iterator for ConnectAddrsIter<'_> {
148    type Item = SocketAddr;
149
150    fn next(&mut self) -> Option<Self::Item> {
151        match self.inner {
152            Either::Left(ref mut opt) => opt.take(),
153            Either::Right(ref mut iter) => iter.next().copied(),
154        }
155    }
156
157    fn size_hint(&self) -> (usize, Option<usize>) {
158        match self.inner {
159            Either::Left(Some(_)) => (1, Some(1)),
160            Either::Left(None) => (0, Some(0)),
161            Either::Right(ref iter) => iter.size_hint(),
162        }
163    }
164}
165
166impl fmt::Debug for ConnectAddrsIter<'_> {
167    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
168        f.debug_list().entries(self.clone()).finish()
169    }
170}
171
172impl ExactSizeIterator for ConnectAddrsIter<'_> {}
173
174impl FusedIterator for ConnectAddrsIter<'_> {}
175
176/// Owned iterator over addresses in a [`Connect`](struct.Connect.html) request.
177#[derive(Debug)]
178pub struct ConnectTakeAddrsIter {
179    inner: Either<Option<SocketAddr>, vec_deque::IntoIter<SocketAddr>>,
180}
181
182impl Iterator for ConnectTakeAddrsIter {
183    type Item = SocketAddr;
184
185    fn next(&mut self) -> Option<Self::Item> {
186        match self.inner {
187            Either::Left(ref mut opt) => opt.take(),
188            Either::Right(ref mut iter) => iter.next(),
189        }
190    }
191
192    fn size_hint(&self) -> (usize, Option<usize>) {
193        match self.inner {
194            Either::Left(Some(_)) => (1, Some(1)),
195            Either::Left(None) => (0, Some(0)),
196            Either::Right(ref iter) => iter.size_hint(),
197        }
198    }
199}
200
201impl ExactSizeIterator for ConnectTakeAddrsIter {}
202
203impl FusedIterator for ConnectTakeAddrsIter {}
204
205fn parse(host: &str) -> (&str, Option<u16>) {
206    let mut parts_iter = host.splitn(2, ':');
207    if let Some(host) = parts_iter.next() {
208        let port_str = parts_iter.next().unwrap_or("");
209        if let Ok(port) = port_str.parse::<u16>() {
210            (host, Some(port))
211        } else {
212            (host, None)
213        }
214    } else {
215        (host, None)
216    }
217}
218
219pub struct Connection<T, U> {
220    io: U,
221    req: T,
222}
223
224impl<T, U> Connection<T, U> {
225    pub fn new(io: U, req: T) -> Self {
226        Self { io, req }
227    }
228}
229
230impl<T, U> Connection<T, U> {
231    /// Reconstruct from a parts.
232    pub fn from_parts(io: U, req: T) -> Self {
233        Self { io, req }
234    }
235
236    /// Deconstruct into a parts.
237    pub fn into_parts(self) -> (U, T) {
238        (self.io, self.req)
239    }
240
241    /// Replace inclosed object, return new Stream and old object
242    pub fn replace<Y>(self, io: Y) -> (U, Connection<T, Y>) {
243        (self.io, Connection { io, req: self.req })
244    }
245
246    /// Returns a shared reference to the underlying stream.
247    pub fn get_ref(&self) -> &U {
248        &self.io
249    }
250
251    /// Returns a mutable reference to the underlying stream.
252    pub fn get_mut(&mut self) -> &mut U {
253        &mut self.io
254    }
255}
256
257impl<T: Address, U> Connection<T, U> {
258    /// Get request
259    pub fn host(&self) -> &str {
260        &self.req.host()
261    }
262}
263
264impl<T, U> std::ops::Deref for Connection<T, U> {
265    type Target = U;
266
267    fn deref(&self) -> &U {
268        &self.io
269    }
270}
271
272impl<T, U> std::ops::DerefMut for Connection<T, U> {
273    fn deref_mut(&mut self) -> &mut U {
274        &mut self.io
275    }
276}
277
278impl<T, U: fmt::Debug> fmt::Debug for Connection<T, U> {
279    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
280        write!(f, "Stream {{{:?}}}", self.io)
281    }
282}