abstract_ns/
combinators.rs

1//! A number of combinators returned by methods on traits
2use futures::{Async, Future, Stream};
3use futures::future::{FutureResult, err};
4use {Name, Address, IpList, Error};
5use {Resolve, Subscribe, HostResolve, HostSubscribe};
6
7/// A stream returned from subscription on FrozenResolver
8///
9/// This stream basically yields a first value of a future and never returns
10/// ready again, effectively making the stream unlimited (the end of name
11/// stream shuts down the consumer by a convention)
12#[derive(Debug)]
13pub struct StreamOnce<F> {
14    future: Option<F>,
15}
16
17/// A subscriber that resolves once and never updates the result
18///
19/// You can create it with `Resolve::frozen_subscriber`
20#[derive(Debug)]
21pub struct FrozenSubscriber<R> {
22    pub(crate) resolver: R,
23}
24
25/// A resolver that implements implements Resolve+HostResolve but returns
26/// `NameNotFound` on `resolve`
27///
28/// This is needed to add resolver that can only resolve hostnames to
29/// the router.
30///
31/// You can create it with `HostResolve::null_service_resolver`
32#[derive(Debug)]
33pub struct NullResolver<R> {
34    pub(crate) resolver: R,
35}
36
37/// A resolver that implements implements Resolve+HostResolve but returns
38/// `NameNotFound` on `resolve_host`
39///
40/// This is needed to add resolver that can only resolve services to
41/// the router.
42///
43/// You can create it with `Resolve::null_host_resolver`
44#[derive(Debug)]
45pub struct NullHostResolver<R> {
46    pub(crate) resolver: R,
47}
48
49impl<F: Future> Stream for StreamOnce<F> {
50    type Item = F::Item;
51    type Error = F::Error;
52    fn poll(&mut self) -> Result<Async<Option<F::Item>>, F::Error> {
53        let result = match self.future.as_mut() {
54            Some(f) => {
55                match f.poll()? {
56                    Async::Ready(v) => v,
57                    Async::NotReady => return Ok(Async::NotReady),
58                }
59            }
60            None => return Ok(Async::NotReady),
61        };
62        self.future = None;
63        return Ok(Async::Ready(Some(result)));
64    }
65}
66
67impl<R: Resolve> Resolve for NullHostResolver<R> {
68    type Future = R::Future;
69    fn resolve(&self, name: &Name) -> Self::Future {
70        self.resolver.resolve(name)
71    }
72}
73
74impl<R> Resolve for NullResolver<R> {
75    type Future = FutureResult<Address, Error>;
76    fn resolve(&self, _name: &Name) -> Self::Future {
77        err(Error::NameNotFound)
78    }
79}
80
81impl<R: Resolve> Resolve for FrozenSubscriber<R> {
82    type Future = R::Future;
83    fn resolve(&self, name: &Name) -> Self::Future {
84        self.resolver.resolve(name)
85    }
86}
87
88impl<R: Resolve> Subscribe for FrozenSubscriber<R> {
89    type Stream = StreamOnce<R::Future>;
90    type Error = <R::Future as Future>::Error;
91    fn subscribe(&self, name: &Name) -> Self::Stream {
92        StreamOnce { future: Some(self.resolve(name)) }
93    }
94}
95
96impl<R: HostResolve> HostResolve for NullResolver<R> {
97    type HostFuture = R::HostFuture;
98    fn resolve_host(&self, name: &Name) -> Self::HostFuture {
99        self.resolver.resolve_host(name)
100    }
101}
102
103impl<R> HostResolve for NullHostResolver<R> {
104    type HostFuture = FutureResult<IpList, Error>;
105    fn resolve_host(&self, _name: &Name) -> Self::HostFuture {
106        err(Error::NameNotFound)
107    }
108}
109
110impl<R: HostResolve> HostResolve for FrozenSubscriber<R> {
111    type HostFuture = R::HostFuture;
112    fn resolve_host(&self, name: &Name) -> Self::HostFuture {
113        self.resolver.resolve_host(name)
114    }
115}
116
117impl<R: HostResolve> HostSubscribe for FrozenSubscriber<R> {
118    type HostStream = StreamOnce<R::HostFuture>;
119    type HostError = <R::HostFuture as Future>::Error;
120    fn subscribe_host(&self, name: &Name) -> Self::HostStream {
121        StreamOnce { future: Some(self.resolve_host(name)) }
122    }
123}