ns_router/
subscribe_ext.rs1use std::fmt;
3use std::time::Duration;
4use std::rc::Rc;
5
6use abstract_ns::{Resolve, HostResolve, Subscribe, HostSubscribe, Name};
7use abstract_ns::{Address, IpList};
8use futures::{Future, Stream, Async};
9use tokio_core::reactor::{Handle, Timeout};
10
11
12#[derive(Debug)]
16pub struct IntervalSubscriber<R>(Rc<Internal<R>>);
17
18#[derive(Debug)]
19struct Internal<R> {
20 resolver: R,
21 interval: Duration,
22 handle: Handle,
23}
24
25enum State<F> {
26 Sleeping(Timeout),
27 Waiting(F),
28}
29
30pub struct IntervalResolver<R: Resolve> {
32 internal: Rc<Internal<R>>,
33 name: Name,
34 last_value: Option<Address>,
35 state: State<R::Future>,
36}
37
38pub struct IntervalHostResolver<R: HostResolve> {
40 internal: Rc<Internal<R>>,
41 name: Name,
42 last_value: Option<IpList>,
43 state: State<R::HostFuture>,
44}
45
46pub trait SubscribeExt {
49 fn interval_subscriber(self, interval: Duration, handle: &Handle)
52 -> IntervalSubscriber<Self>
53 where Self: Sized;
54}
55
56impl<T: Resolve + HostResolve> SubscribeExt for T {
57 fn interval_subscriber(self, interval: Duration, handle: &Handle)
58 -> IntervalSubscriber<Self>
59 where Self: Sized
60 {
61 IntervalSubscriber(Rc::new(Internal {
62 resolver: self,
63 interval,
64 handle: handle.clone(),
65 }))
66 }
67}
68
69impl<T: Resolve> Resolve for IntervalSubscriber<T> {
70 type Future = T::Future;
71 fn resolve(&self, name: &Name) -> Self::Future {
72 self.0.resolver.resolve(name)
73 }
74}
75
76impl<T: HostResolve> HostResolve for IntervalSubscriber<T> {
77 type HostFuture = T::HostFuture;
78 fn resolve_host(&self, name: &Name) -> Self::HostFuture {
79 self.0.resolver.resolve_host(name)
80 }
81}
82
83impl<T: Resolve> Subscribe for IntervalSubscriber<T> {
84 type Error = <T::Future as Future>::Error;
85 type Stream = IntervalResolver<T>;
86 fn subscribe(&self, name: &Name) -> Self::Stream {
87 IntervalResolver {
88 internal: self.0.clone(),
89 name: name.clone(),
90 last_value: None,
91 state: State::Waiting(self.resolve(name)),
92 }
93 }
94}
95
96impl<T: HostResolve> HostSubscribe for IntervalSubscriber<T> {
97 type HostError = <T::HostFuture as Future>::Error;
98 type HostStream = IntervalHostResolver<T>;
99 fn subscribe_host(&self, name: &Name) -> Self::HostStream {
100 IntervalHostResolver {
101 internal: self.0.clone(),
102 name: name.clone(),
103 last_value: None,
104 state: State::Waiting(self.0.resolver.resolve_host(name)),
105 }
106 }
107}
108
109
110impl<R: HostResolve> Stream for IntervalHostResolver<R> {
111 type Item = IpList;
112 type Error = <R::HostFuture as Future>::Error;
113 fn poll(&mut self) -> Result<Async<Option<IpList>>, Self::Error> {
114 use self::State::*;
115 loop {
116 let mut updated = false;
117 match self.state {
118 Sleeping(ref mut timer) => {
119 match timer.poll().expect("timer never fails") {
120 Async::NotReady => return Ok(Async::NotReady),
121 Async::Ready(()) => {}
122 }
123 }
124 Waiting(ref mut future) => {
125 match future.poll()? {
126 Async::NotReady => return Ok(Async::NotReady),
127 Async::Ready(a) => {
128 if self.last_value.as_ref() != Some(&a) {
129 self.last_value = Some(a);
130 updated = true;
131 }
132 }
133 }
134 }
135 }
136 match &mut self.state {
137 state @ &mut Sleeping(..) => {
138 *state = Waiting(self.internal.resolver
139 .resolve_host(&self.name));
140 }
141 state @ &mut Waiting(..) => {
142 *state = Sleeping(Timeout::new(
143 self.internal.interval, &self.internal.handle)
144 .expect("timeout never fails"));
145 }
146 }
147 if updated {
148 return Ok(Async::Ready(self.last_value.clone()));
149 }
150 }
151 }
152}
153
154impl<R: Resolve> Stream for IntervalResolver<R> {
155 type Item = Address;
156 type Error = <R::Future as Future>::Error;
157 fn poll(&mut self) -> Result<Async<Option<Address>>, Self::Error> {
158 use self::State::*;
159 loop {
160 let mut updated = false;
161 match self.state {
162 Sleeping(ref mut timer) => {
163 match timer.poll().expect("timer never fails") {
164 Async::NotReady => return Ok(Async::NotReady),
165 Async::Ready(()) => {}
166 }
167 }
168 Waiting(ref mut future) => {
169 match future.poll()? {
170 Async::NotReady => return Ok(Async::NotReady),
171 Async::Ready(a) => {
172 if self.last_value.as_ref() != Some(&a) {
173 self.last_value = Some(a);
174 updated = true;
175 }
176 }
177 }
178 }
179 }
180 match &mut self.state {
181 state @ &mut Sleeping(..) => {
182 *state = Waiting(self.internal.resolver
183 .resolve(&self.name));
184 }
185 state @ &mut Waiting(..) => {
186 *state = Sleeping(Timeout::new(
187 self.internal.interval, &self.internal.handle)
188 .expect("timeout never fails"));
189 }
190 }
191 if updated {
192 return Ok(Async::Ready(self.last_value.clone()));
193 }
194 }
195 }
196}
197
198impl<R: Resolve> fmt::Debug for IntervalResolver<R> {
199 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
200 f.debug_struct("InternalResolver")
201 .field("last_value", &self.last_value)
202 .finish()
203 }
204}
205
206impl<R: HostResolve> fmt::Debug for IntervalHostResolver<R> {
207 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
208 f.debug_struct("InternalHostResolver")
209 .field("last_value", &self.last_value)
210 .finish()
211 }
212}