ns_router/
subscribe_ext.rs

1//! An extension trait that turns resolvers into subscribers
2use std::fmt;
3use std::time::Duration;
4use std::rc::Rc;
5
6use abstract_ns::{Resolve, HostResolve, Subscribe, HostSubscribe, Name};
7use abstract_ns::{Address, IpList};
8use futures::{Future, Stream, Async};
9use tokio_core::reactor::{Handle, Timeout};
10
11
12/// A subscriber which polls resolver at a regular interval
13///
14/// Create the instance with `SubscribeExt::interval_subscriber`
15#[derive(Debug)]
16pub struct IntervalSubscriber<R>(Rc<Internal<R>>);
17
18#[derive(Debug)]
19struct Internal<R> {
20    resolver: R,
21    interval: Duration,
22    handle: Handle,
23}
24
25enum State<F> {
26    Sleeping(Timeout),
27    Waiting(F),
28}
29
30/// A stream returned by IntervalSubscriber::subscribe
31pub struct IntervalResolver<R: Resolve> {
32    internal: Rc<Internal<R>>,
33    name: Name,
34    last_value: Option<Address>,
35    state: State<R::Future>,
36}
37
38/// A stream returned by IntervalSubscriber::subscribe_host
39pub struct IntervalHostResolver<R: HostResolve> {
40    internal: Rc<Internal<R>>,
41    name: Name,
42    last_value: Option<IpList>,
43    state: State<R::HostFuture>,
44}
45
46/// An extension trait for resolver
47///
48pub trait SubscribeExt {
49    /// Return a subscriber that uses `resolve` or `resolve_host` at a regular
50    /// interval
51    fn interval_subscriber(self, interval: Duration, handle: &Handle)
52        -> IntervalSubscriber<Self>
53        where Self: Sized;
54}
55
56impl<T: Resolve + HostResolve> SubscribeExt for T {
57    fn interval_subscriber(self, interval: Duration, handle: &Handle)
58        -> IntervalSubscriber<Self>
59        where Self: Sized
60    {
61        IntervalSubscriber(Rc::new(Internal {
62            resolver: self,
63            interval,
64            handle: handle.clone(),
65        }))
66    }
67}
68
69impl<T: Resolve> Resolve for IntervalSubscriber<T> {
70    type Future = T::Future;
71    fn resolve(&self, name: &Name) -> Self::Future {
72        self.0.resolver.resolve(name)
73    }
74}
75
76impl<T: HostResolve> HostResolve for IntervalSubscriber<T> {
77    type HostFuture = T::HostFuture;
78    fn resolve_host(&self, name: &Name) -> Self::HostFuture {
79        self.0.resolver.resolve_host(name)
80    }
81}
82
83impl<T: Resolve> Subscribe for IntervalSubscriber<T> {
84    type Error = <T::Future as Future>::Error;
85    type Stream = IntervalResolver<T>;
86    fn subscribe(&self, name: &Name) -> Self::Stream {
87        IntervalResolver {
88            internal: self.0.clone(),
89            name: name.clone(),
90            last_value: None,
91            state: State::Waiting(self.resolve(name)),
92        }
93    }
94}
95
96impl<T: HostResolve> HostSubscribe for IntervalSubscriber<T> {
97    type HostError = <T::HostFuture as Future>::Error;
98    type HostStream = IntervalHostResolver<T>;
99    fn subscribe_host(&self, name: &Name) -> Self::HostStream {
100        IntervalHostResolver {
101            internal: self.0.clone(),
102            name: name.clone(),
103            last_value: None,
104            state: State::Waiting(self.0.resolver.resolve_host(name)),
105        }
106    }
107}
108
109
110impl<R: HostResolve> Stream for IntervalHostResolver<R> {
111    type Item = IpList;
112    type Error = <R::HostFuture as Future>::Error;
113    fn poll(&mut self) -> Result<Async<Option<IpList>>, Self::Error> {
114        use self::State::*;
115        loop {
116            let mut updated = false;
117            match self.state {
118                Sleeping(ref mut timer) => {
119                    match timer.poll().expect("timer never fails") {
120                        Async::NotReady => return Ok(Async::NotReady),
121                        Async::Ready(()) => {}
122                    }
123                }
124                Waiting(ref mut future) => {
125                    match future.poll()? {
126                        Async::NotReady => return Ok(Async::NotReady),
127                        Async::Ready(a) => {
128                            if self.last_value.as_ref() != Some(&a) {
129                                self.last_value = Some(a);
130                                updated = true;
131                            }
132                        }
133                    }
134                }
135            }
136            match &mut self.state {
137                state @ &mut Sleeping(..) => {
138                    *state = Waiting(self.internal.resolver
139                        .resolve_host(&self.name));
140                }
141                state @ &mut Waiting(..) => {
142                    *state = Sleeping(Timeout::new(
143                        self.internal.interval, &self.internal.handle)
144                        .expect("timeout never fails"));
145                }
146            }
147            if updated {
148                return Ok(Async::Ready(self.last_value.clone()));
149            }
150        }
151    }
152}
153
154impl<R: Resolve> Stream for IntervalResolver<R> {
155    type Item = Address;
156    type Error = <R::Future as Future>::Error;
157    fn poll(&mut self) -> Result<Async<Option<Address>>, Self::Error> {
158        use self::State::*;
159        loop {
160            let mut updated = false;
161            match self.state {
162                Sleeping(ref mut timer) => {
163                    match timer.poll().expect("timer never fails") {
164                        Async::NotReady => return Ok(Async::NotReady),
165                        Async::Ready(()) => {}
166                    }
167                }
168                Waiting(ref mut future) => {
169                    match future.poll()? {
170                        Async::NotReady => return Ok(Async::NotReady),
171                        Async::Ready(a) => {
172                            if self.last_value.as_ref() != Some(&a) {
173                                self.last_value = Some(a);
174                                updated = true;
175                            }
176                        }
177                    }
178                }
179            }
180            match &mut self.state {
181                state @ &mut Sleeping(..) => {
182                    *state = Waiting(self.internal.resolver
183                        .resolve(&self.name));
184                }
185                state @ &mut Waiting(..) => {
186                    *state = Sleeping(Timeout::new(
187                        self.internal.interval, &self.internal.handle)
188                        .expect("timeout never fails"));
189                }
190            }
191            if updated {
192                return Ok(Async::Ready(self.last_value.clone()));
193            }
194        }
195    }
196}
197
198impl<R: Resolve> fmt::Debug for IntervalResolver<R> {
199    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
200        f.debug_struct("InternalResolver")
201        .field("last_value", &self.last_value)
202        .finish()
203    }
204}
205
206impl<R: HostResolve> fmt::Debug for IntervalHostResolver<R> {
207    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
208        f.debug_struct("InternalHostResolver")
209        .field("last_value", &self.last_value)
210        .finish()
211    }
212}