ns_router/
router.rs

1use std::fmt;
2use std::sync::Arc;
3
4use abstract_ns::{Name, Resolve, HostResolve, Subscribe, HostSubscribe};
5use abstract_ns::{Address, Error};
6use futures::{Stream, Future};
7use futures::future::{empty};
8use futures::stream::{once};
9use futures::sync::oneshot;
10use futures::sync::mpsc::{unbounded, UnboundedSender};
11use tokio_core::reactor::Handle;
12use void::Void;
13
14use config::Config;
15use coroutine::{ResolverFuture};
16use future::{AddrStream, ResolveFuture, HostStream, ResolveHostFuture};
17use future::{UpdateSink};
18use internal::{fail, Request};
19use multisubscr::MultiSubscr;
20use name::{AutoName, InternalName, IntoNameIter};
21use async_slot as slot;
22use subscr::Wrapper;
23
24/// An actual router class
25///
26/// Note: when router is shut down (when config stream is closed), all futures
27/// and subscriptions are canceled.
28///
29/// *Since 0.1.3*: when all instances of Router are dropped the underlying
30/// futures and streams continue to work, until all of them finish (for
31/// subscriptions it means subscriber is dropped). In previous versions this
32/// dropped all requests immediately. This was common source of confusion and
33/// we consider this a bug. If you want to force close all futures and
34/// subscriptions create a router with `from_stream` or `updating_config` and
35/// send EOS on stream or drop `UpdatingSink` respectively.
36#[derive(Debug, Clone)]
37pub struct Router {
38    requests: UnboundedSender<Request>,
39}
40
41
42impl Router {
43
44    /// Create a router for a static config
45    pub fn from_config(config: &Arc<Config>, handle: &Handle) -> Router {
46        let (tx, rx) = unbounded();
47        handle.spawn(ResolverFuture::new(
48            once(Ok(config.clone())).chain(empty().into_stream()),
49            rx, &handle));
50        Router {
51            requests: tx,
52        }
53    }
54
55    /// Create a router with updating config
56    ///
57    /// Note: router is defunctional until first config is received in a
58    /// stream. By defunctional we mean that every request will wait, until
59    /// configured.
60    ///
61    /// Note 2: when stream is closed router is shut down, so usually the
62    /// stream must be infinite.
63    pub fn from_stream<S>(stream: S, handle: &Handle) -> Router
64        where S: Stream<Item=Arc<Config>, Error=Void> + 'static
65    {
66        let (tx, rx) = unbounded();
67        handle.spawn(ResolverFuture::new(stream, rx, &handle));
68        Router {
69            requests: tx,
70        }
71    }
72
73    /// Create a router and update channel
74    ///
75    /// Note: router is shut down when `UpdateSink` is dropped. So keep
76    /// it somewhere so you can update config.
77    pub fn updating_config(config: &Arc<Config>, handle: &Handle)
78        -> (Router, UpdateSink)
79    {
80        let (ctx, crx) = slot::channel();
81        let stream = once(Ok(config.clone())).chain(crx)
82            .map_err(|_| unreachable!());
83        let (tx, rx) = unbounded();
84        handle.spawn(ResolverFuture::new(stream, rx, &handle));
85        return (
86            Router {
87                requests: tx,
88            },
89            UpdateSink(ctx),
90        );
91    }
92
93    pub(crate) fn _subscribe_stream<S>(&self,
94        stream: S, tx: slot::Sender<Address>)
95        where S: Stream<Item=Vec<InternalName>> + Send + 'static,
96              S::Error: fmt::Display,
97    {
98        self.requests.unbounded_send(
99            Request::Task(Wrapper::wrap_send(MultiSubscr::new(stream, tx))))
100            // can't do anything when resolver is down, (no error in stream)
101            // but this will shut down stream which will be visible
102            // for the appplication, which is probably shutting down anyway
103            .map_err(|_| debug!("Stream subscription when resolver is down"))
104            .ok();
105    }
106
107    /// Subscribes to a list of names
108    ///
109    /// This is intended to keep list of services in configuration file,
110    /// like this (yaml):
111    ///
112    /// ```yaml
113    /// addresses:
114    /// - example.org:8080
115    /// - _my._svc.example.org  # SVC record
116    /// - example.net           # default port
117    /// ```
118    ///
119    /// You can also specify a way to resolve the service by providing
120    /// iterator over `AutoName` instances instead of plain `&str` (both are
121    /// accepted in this method).
122    pub fn subscribe_many<'x, I>(&self, iter: I, default_port: u16)
123        -> AddrStream
124        where I: IntoIterator,
125              I::Item: Into<AutoName<'x>>,
126    {
127        let (tx, rx) = slot::channel();
128        let mut lst = Vec::new();
129        for addr in iter {
130            match addr.into().parse(default_port) {
131                Ok(x) => lst.push(x),
132                Err(e) => {
133                    warn!("Error parsing name: {}", e);
134                }
135            }
136        }
137        self._subscribe_stream(
138            once(Ok::<_, Void>(lst)).chain(empty().into_stream()), tx);
139        AddrStream(rx)
140    }
141
142    /// Subscribes to a stream that yields lists of names
143    ///
144    /// See the description of [`subscribe_many`](#tymethod.subscribe_many)
145    /// for the description of the list of names that must be yielded from
146    /// the stream.
147    ///
148    /// Note: this is meant for configuration update scenario. I.e. when
149    /// configuration is reloaded and new list of names is received, it
150    /// should be pushed to this stream. The items received in the stream
151    /// are non-cumulative and replace previous list.
152    ///
153    /// Note 2: If stream is errored or end-of-stream reached, this means
154    /// name is not needed any more and its `AddrStream` will be shut down,
155    /// presumably shutting down everything that depends on it.
156    #[deprecated(since="0.1.1", note="use subscribe_stream()")]
157    pub fn subscribe_many_stream<'x, S>(&self, stream: S, default_port: u16)
158        -> AddrStream
159        where S: Stream + Send + 'static,
160              S::Item: IntoIterator,
161              S::Error: fmt::Display,
162              <S::Item as IntoIterator>::Item: Into<AutoName<'x>>,
163    {
164        let (tx, rx) = slot::channel();
165        self._subscribe_stream(stream.map(move |iter| {
166            let mut lst = Vec::new();
167            for addr in iter {
168                match addr.into().parse(default_port) {
169                    Ok(x) => lst.push(x),
170                    Err(e) => {
171                        warn!("Error parsing name: {}", e);
172                    }
173                }
174            }
175            lst
176        }), tx);
177        AddrStream(rx)
178    }
179    /// Subscribes to a stream that yields lists of names
180    ///
181    /// See the description of [`subscribe_many`](#tymethod.subscribe_many)
182    /// for the description of the list of names that must be yielded from
183    /// the stream.
184    ///
185    /// Note: this is meant for configuration update scenario. I.e. when
186    /// configuration is reloaded and new list of names is received, it
187    /// should be pushed to this stream. The items received in the stream
188    /// are non-cumulative and replace previous list.
189    ///
190    /// Note 2: If stream is errored or end-of-stream reached, this means
191    /// name is not needed any more and its `AddrStream` will be shut down,
192    /// presumably shutting down everything that depends on it.
193    pub fn subscribe_stream<S>(&self, stream: S, default_port: u16)
194        -> AddrStream
195        where S: Stream + Send + 'static,
196              S::Error: fmt::Display,
197              for<'x> S::Item: IntoNameIter<'x>,
198    {
199        let (tx, rx) = slot::channel();
200        self._subscribe_stream(stream.map(move |iter| {
201            let mut lst = Vec::new();
202            for addr in iter.into_name_iter() {
203                match addr.into().parse(default_port) {
204                    Ok(x) => lst.push(x),
205                    Err(e) => {
206                        warn!("Error parsing name: {}", e);
207                        //unimplemented!();
208                    }
209                }
210            }
211            lst
212        }), tx);
213        AddrStream(rx)
214    }
215
216    /// Resolve a string or other things into an address
217    ///
218    /// See description of [`subscribe_many`] to find out how names are parsed
219    ///
220    /// See [`AutoName`] for supported types
221    ///
222    /// [`subscribe_many`]: #tymethod.subscribe_many
223    /// [`AutoName`]:
224    pub fn resolve_auto<'x, N: Into<AutoName<'x>>>(&self,
225        name: N, default_port: u16)
226        -> ResolveFuture
227    {
228        let (tx, rx) = oneshot::channel();
229        match name.into().parse(default_port) {
230            Ok(InternalName::HostPort(name, port)) => {
231                match self.requests.unbounded_send(
232                    Request::ResolveHostPort(name.clone(), port, tx))
233                {
234                    Ok(()) => {}
235                    Err(e) => match e.into_inner() {
236                        Request::ResolveHostPort(name, _, tx) => {
237                            fail(&name, tx, Error::TemporaryError(
238                                "Resolver is down".into()));
239                        }
240                        _ => unreachable!(),
241                    }
242                }
243            }
244            Ok(InternalName::Service(name)) => {
245                match self.requests.unbounded_send(
246                    Request::Resolve(name.clone(), tx))
247                {
248                    Ok(()) => {}
249                    Err(e) => match e.into_inner() {
250                        Request::Resolve(name, tx) => {
251                            fail(&name, tx, Error::TemporaryError(
252                                "Resolver is down".into()));
253                        }
254                        _ => unreachable!(),
255                    }
256                }
257            }
258            Ok(InternalName::Addr(addr)) => {
259                tx.send(Ok(addr.into())).ok();
260            }
261            Err(e) => {
262                tx.send(Err(e.into())).ok();
263            }
264        }
265        ResolveFuture(rx)
266    }
267
268}
269
270impl HostResolve for Router {
271    type HostFuture = ResolveHostFuture;
272    fn resolve_host(&self, name: &Name) -> ResolveHostFuture {
273        let (tx, rx) = oneshot::channel();
274        match self.requests.unbounded_send(
275            Request::ResolveHost(name.clone(), tx))
276        {
277            Ok(()) => {}
278            Err(e) => match e.into_inner() {
279                Request::ResolveHost(name, tx) => {
280                    fail(&name, tx, Error::TemporaryError(
281                        "Resolver is down".into()));
282                }
283                _ => unreachable!(),
284            }
285        }
286        ResolveHostFuture(rx)
287    }
288}
289
290impl Resolve for Router {
291    type Future = ResolveFuture;
292    fn resolve(&self, name: &Name) -> ResolveFuture {
293        let (tx, rx) = oneshot::channel();
294        match self.requests.unbounded_send(
295            Request::Resolve(name.clone(), tx))
296        {
297            Ok(()) => {}
298            Err(e) => match e.into_inner() {
299                Request::Resolve(name, tx) => {
300                    fail(&name, tx, Error::TemporaryError(
301                        "Resolver is down".into()));
302                }
303                _ => unreachable!(),
304            }
305        }
306        ResolveFuture(rx)
307    }
308
309}
310
311impl HostSubscribe for Router {
312    type HostError = Void;
313    type HostStream = HostStream;
314    fn subscribe_host(&self, name: &Name) -> HostStream {
315        let (tx, rx) = slot::channel();
316        self.requests.unbounded_send(
317            Request::HostSubscribe(name.clone(), tx))
318            // can't do anything when resolver is down, (no error in stream)
319            // but this will shut down stream which will be visible
320            // for the appplication, which is probably shutting down anyway
321            .map_err(|_| debug!("Subscription for {} when resolver is down",
322                name))
323            .ok();
324        HostStream(rx)
325    }
326}
327
328impl Subscribe for Router {
329    type Error = Void;
330    type Stream = AddrStream;
331    fn subscribe(&self, name: &Name) -> AddrStream {
332        let (tx, rx) = slot::channel();
333        self.requests.unbounded_send(
334            Request::Subscribe(name.clone(), tx))
335            // can't do anything when resolver is down, (no error in stream)
336            // but this will shut down stream which will be visible
337            // for the appplication, which is probably shutting down anyway
338            .map_err(|_| debug!("Subscription for {} when resolver is down",
339                name))
340            .ok();
341        AddrStream(rx)
342    }
343}
344
345#[cfg(test)]
346#[allow(dead_code)]
347mod type_test {
348    use std::sync::Arc;
349    use futures::Stream;
350    use name::AutoName;
351    use super::Router;
352
353    fn test_vec_string(r: &Router, v: Vec<String>) {
354        drop(r.subscribe_many(&v, 1));
355    }
356
357    fn test_vec_str(r: &Router, v: Vec<&str>) {
358        drop(r.subscribe_many(&v, 1));
359    }
360
361    fn test_vec_auto(r: &Router, v: Vec<AutoName>) {
362        drop(r.subscribe_many(v, 1));
363    }
364
365    fn test_map_auto(r: &Router, v: Vec<&str>) {
366        drop(r.subscribe_many(v.into_iter().map(AutoName::Auto), 1));
367    }
368
369    #[derive(Debug)]
370    enum MyName {
371        Auto(String),
372        Service(String),
373    }
374
375    #[derive(Debug)]
376    struct List1(Arc<Vec<MyName>>);
377
378    #[derive(Debug)]
379    struct List2(Arc<Vec<MyName>>);
380
381    impl<'a> IntoIterator for &'a List1 {
382        type Item = AutoName<'a>;
383        type IntoIter = ::std::iter::Map<::std::slice::Iter<'a, MyName>, fn(&'a MyName) -> AutoName<'a>>;
384        fn into_iter(self) -> Self::IntoIter {
385            self.0.iter().map(|x| x.into())
386        }
387    }
388
389    impl<'a> IntoIterator for &'a List2 {
390        type Item = &'a MyName;
391        type IntoIter = ::std::slice::Iter<'a, MyName>;
392        fn into_iter(self) -> Self::IntoIter {
393            self.0.iter()
394        }
395    }
396
397    impl<'a> Into<AutoName<'a>> for &'a MyName {
398        fn into(self) -> AutoName<'a> {
399            match *self {
400                MyName::Auto(ref x) => AutoName::Auto(x),
401                MyName::Service(ref x) => AutoName::Service(x),
402            }
403        }
404    }
405
406    fn test_subscribe_vec<S>(r: &Router, s: S)
407        where S: Stream<Item=Vec<String>, Error=String> + Send + 'static
408    {
409        drop(r.subscribe_stream(s, 80));
410    }
411
412    fn test_subscribe_vec_custom<S>(r: &Router, s: S)
413        where S: Stream<Item=Vec<MyName>, Error=String> + Send + 'static
414    {
415        drop(r.subscribe_stream(s, 80));
416    }
417
418    fn test_subscribe_arc_vec1<S>(r: &Router, s: S)
419        where S: Stream<Item=List1, Error=String> + Send + 'static
420    {
421        drop(r.subscribe_stream(s, 80));
422    }
423
424    fn test_subscribe_arc_vec2<S>(r: &Router, s: S)
425        where S: Stream<Item=List2, Error=String> + Send + 'static
426    {
427        drop(r.subscribe_stream(s, 80));
428    }
429}