actori_connect/
connect.rs

1use std::collections::{vec_deque, VecDeque};
2use std::fmt;
3use std::iter::{FromIterator, FusedIterator};
4use std::net::SocketAddr;
5
6use either::Either;
7
8/// Connect request
9pub trait Address: Unpin {
10    /// Host name of the request
11    fn host(&self) -> &str;
12
13    /// Port of the request
14    fn port(&self) -> Option<u16>;
15}
16
17impl Address for String {
18    fn host(&self) -> &str {
19        &self
20    }
21
22    fn port(&self) -> Option<u16> {
23        None
24    }
25}
26
27impl Address for &'static str {
28    fn host(&self) -> &str {
29        self
30    }
31
32    fn port(&self) -> Option<u16> {
33        None
34    }
35}
36
37/// Connect request
38#[derive(Eq, PartialEq, Debug, Hash)]
39pub struct Connect<T> {
40    pub(crate) req: T,
41    pub(crate) port: u16,
42    pub(crate) addr: Option<Either<SocketAddr, VecDeque<SocketAddr>>>,
43}
44
45impl<T: Address> Connect<T> {
46    /// Create `Connect` instance by spliting the string by ':' and convert the second part to u16
47    pub fn new(req: T) -> Connect<T> {
48        let (_, port) = parse(req.host());
49        Connect {
50            req,
51            port: port.unwrap_or(0),
52            addr: None,
53        }
54    }
55
56    /// Create new `Connect` instance from host and address. Connector skips name resolution stage for such connect messages.
57    pub fn with(req: T, addr: SocketAddr) -> Connect<T> {
58        Connect {
59            req,
60            port: 0,
61            addr: Some(Either::Left(addr)),
62        }
63    }
64
65    /// Use port if address does not provide one.
66    ///
67    /// By default it set to 0
68    pub fn set_port(mut self, port: u16) -> Self {
69        self.port = port;
70        self
71    }
72
73    /// Use address.
74    pub fn set_addr(mut self, addr: Option<SocketAddr>) -> Self {
75        if let Some(addr) = addr {
76            self.addr = Some(Either::Left(addr));
77        }
78        self
79    }
80
81    /// Use addresses.
82    pub fn set_addrs<I>(mut self, addrs: I) -> Self
83    where
84        I: IntoIterator<Item = SocketAddr>,
85    {
86        let mut addrs = VecDeque::from_iter(addrs);
87        self.addr = if addrs.len() < 2 {
88            addrs.pop_front().map(Either::Left)
89        } else {
90            Some(Either::Right(addrs))
91        };
92        self
93    }
94
95    /// Host name
96    pub fn host(&self) -> &str {
97        self.req.host()
98    }
99
100    /// Port of the request
101    pub fn port(&self) -> u16 {
102        self.req.port().unwrap_or(self.port)
103    }
104
105    /// Preresolved addresses of the request.
106    pub fn addrs(&self) -> ConnectAddrsIter<'_> {
107        let inner = match self.addr {
108            None => Either::Left(None),
109            Some(Either::Left(addr)) => Either::Left(Some(addr)),
110            Some(Either::Right(ref addrs)) => Either::Right(addrs.iter()),
111        };
112
113        ConnectAddrsIter { inner }
114    }
115
116    /// Takes preresolved addresses of the request.
117    pub fn take_addrs(&mut self) -> ConnectTakeAddrsIter {
118        let inner = match self.addr.take() {
119            None => Either::Left(None),
120            Some(Either::Left(addr)) => Either::Left(Some(addr)),
121            Some(Either::Right(addrs)) => Either::Right(addrs.into_iter()),
122        };
123
124        ConnectTakeAddrsIter { inner }
125    }
126}
127
128impl<T: Address> From<T> for Connect<T> {
129    fn from(addr: T) -> Self {
130        Connect::new(addr)
131    }
132}
133
134impl<T: Address> fmt::Display for Connect<T> {
135    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136        write!(f, "{}:{}", self.host(), self.port())
137    }
138}
139
140/// Iterator over addresses in a [`Connect`](struct.Connect.html) request.
141#[derive(Clone)]
142pub struct ConnectAddrsIter<'a> {
143    inner: Either<Option<SocketAddr>, vec_deque::Iter<'a, SocketAddr>>,
144}
145
146impl Iterator for ConnectAddrsIter<'_> {
147    type Item = SocketAddr;
148
149    fn next(&mut self) -> Option<Self::Item> {
150        match self.inner {
151            Either::Left(ref mut opt) => opt.take(),
152            Either::Right(ref mut iter) => iter.next().copied(),
153        }
154    }
155
156    fn size_hint(&self) -> (usize, Option<usize>) {
157        match self.inner {
158            Either::Left(Some(_)) => (1, Some(1)),
159            Either::Left(None) => (0, Some(0)),
160            Either::Right(ref iter) => iter.size_hint(),
161        }
162    }
163}
164
165impl fmt::Debug for ConnectAddrsIter<'_> {
166    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
167        f.debug_list().entries(self.clone()).finish()
168    }
169}
170
171impl ExactSizeIterator for ConnectAddrsIter<'_> {}
172
173impl FusedIterator for ConnectAddrsIter<'_> {}
174
175/// Owned iterator over addresses in a [`Connect`](struct.Connect.html) request.
176#[derive(Debug)]
177pub struct ConnectTakeAddrsIter {
178    inner: Either<Option<SocketAddr>, vec_deque::IntoIter<SocketAddr>>,
179}
180
181impl Iterator for ConnectTakeAddrsIter {
182    type Item = SocketAddr;
183
184    fn next(&mut self) -> Option<Self::Item> {
185        match self.inner {
186            Either::Left(ref mut opt) => opt.take(),
187            Either::Right(ref mut iter) => iter.next(),
188        }
189    }
190
191    fn size_hint(&self) -> (usize, Option<usize>) {
192        match self.inner {
193            Either::Left(Some(_)) => (1, Some(1)),
194            Either::Left(None) => (0, Some(0)),
195            Either::Right(ref iter) => iter.size_hint(),
196        }
197    }
198}
199
200impl ExactSizeIterator for ConnectTakeAddrsIter {}
201
202impl FusedIterator for ConnectTakeAddrsIter {}
203
204fn parse(host: &str) -> (&str, Option<u16>) {
205    let mut parts_iter = host.splitn(2, ':');
206    if let Some(host) = parts_iter.next() {
207        let port_str = parts_iter.next().unwrap_or("");
208        if let Ok(port) = port_str.parse::<u16>() {
209            (host, Some(port))
210        } else {
211            (host, None)
212        }
213    } else {
214        (host, None)
215    }
216}
217
218pub struct Connection<T, U> {
219    io: U,
220    req: T,
221}
222
223impl<T, U> Connection<T, U> {
224    pub fn new(io: U, req: T) -> Self {
225        Self { io, req }
226    }
227}
228
229impl<T, U> Connection<T, U> {
230    /// Reconstruct from a parts.
231    pub fn from_parts(io: U, req: T) -> Self {
232        Self { io, req }
233    }
234
235    /// Deconstruct into a parts.
236    pub fn into_parts(self) -> (U, T) {
237        (self.io, self.req)
238    }
239
240    /// Replace inclosed object, return new Stream and old object
241    pub fn replace<Y>(self, io: Y) -> (U, Connection<T, Y>) {
242        (self.io, Connection { io, req: self.req })
243    }
244
245    /// Returns a shared reference to the underlying stream.
246    pub fn get_ref(&self) -> &U {
247        &self.io
248    }
249
250    /// Returns a mutable reference to the underlying stream.
251    pub fn get_mut(&mut self) -> &mut U {
252        &mut self.io
253    }
254}
255
256impl<T: Address, U> Connection<T, U> {
257    /// Get request
258    pub fn host(&self) -> &str {
259        &self.req.host()
260    }
261}
262
263impl<T, U> std::ops::Deref for Connection<T, U> {
264    type Target = U;
265
266    fn deref(&self) -> &U {
267        &self.io
268    }
269}
270
271impl<T, U> std::ops::DerefMut for Connection<T, U> {
272    fn deref_mut(&mut self) -> &mut U {
273        &mut self.io
274    }
275}
276
277impl<T, U: fmt::Debug> fmt::Debug for Connection<T, U> {
278    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
279        write!(f, "Stream {{{:?}}}", self.io)
280    }
281}