1use std::fmt;
2use std::sync::Arc;
3
4use abstract_ns::{Name, Resolve, HostResolve, Subscribe, HostSubscribe};
5use abstract_ns::{Address, Error};
6use futures::{Stream, Future};
7use futures::future::{empty};
8use futures::stream::{once};
9use futures::sync::oneshot;
10use futures::sync::mpsc::{unbounded, UnboundedSender};
11use tokio_core::reactor::Handle;
12use void::Void;
13
14use config::Config;
15use coroutine::{ResolverFuture};
16use future::{AddrStream, ResolveFuture, HostStream, ResolveHostFuture};
17use future::{UpdateSink};
18use internal::{fail, Request};
19use multisubscr::MultiSubscr;
20use name::{AutoName, InternalName, IntoNameIter};
21use async_slot as slot;
22use subscr::Wrapper;
23
24#[derive(Debug, Clone)]
37pub struct Router {
38 requests: UnboundedSender<Request>,
39}
40
41
42impl Router {
43
44 pub fn from_config(config: &Arc<Config>, handle: &Handle) -> Router {
46 let (tx, rx) = unbounded();
47 handle.spawn(ResolverFuture::new(
48 once(Ok(config.clone())).chain(empty().into_stream()),
49 rx, &handle));
50 Router {
51 requests: tx,
52 }
53 }
54
55 pub fn from_stream<S>(stream: S, handle: &Handle) -> Router
64 where S: Stream<Item=Arc<Config>, Error=Void> + 'static
65 {
66 let (tx, rx) = unbounded();
67 handle.spawn(ResolverFuture::new(stream, rx, &handle));
68 Router {
69 requests: tx,
70 }
71 }
72
73 pub fn updating_config(config: &Arc<Config>, handle: &Handle)
78 -> (Router, UpdateSink)
79 {
80 let (ctx, crx) = slot::channel();
81 let stream = once(Ok(config.clone())).chain(crx)
82 .map_err(|_| unreachable!());
83 let (tx, rx) = unbounded();
84 handle.spawn(ResolverFuture::new(stream, rx, &handle));
85 return (
86 Router {
87 requests: tx,
88 },
89 UpdateSink(ctx),
90 );
91 }
92
93 pub(crate) fn _subscribe_stream<S>(&self,
94 stream: S, tx: slot::Sender<Address>)
95 where S: Stream<Item=Vec<InternalName>> + Send + 'static,
96 S::Error: fmt::Display,
97 {
98 self.requests.unbounded_send(
99 Request::Task(Wrapper::wrap_send(MultiSubscr::new(stream, tx))))
100 .map_err(|_| debug!("Stream subscription when resolver is down"))
104 .ok();
105 }
106
107 pub fn subscribe_many<'x, I>(&self, iter: I, default_port: u16)
123 -> AddrStream
124 where I: IntoIterator,
125 I::Item: Into<AutoName<'x>>,
126 {
127 let (tx, rx) = slot::channel();
128 let mut lst = Vec::new();
129 for addr in iter {
130 match addr.into().parse(default_port) {
131 Ok(x) => lst.push(x),
132 Err(e) => {
133 warn!("Error parsing name: {}", e);
134 }
135 }
136 }
137 self._subscribe_stream(
138 once(Ok::<_, Void>(lst)).chain(empty().into_stream()), tx);
139 AddrStream(rx)
140 }
141
142 #[deprecated(since="0.1.1", note="use subscribe_stream()")]
157 pub fn subscribe_many_stream<'x, S>(&self, stream: S, default_port: u16)
158 -> AddrStream
159 where S: Stream + Send + 'static,
160 S::Item: IntoIterator,
161 S::Error: fmt::Display,
162 <S::Item as IntoIterator>::Item: Into<AutoName<'x>>,
163 {
164 let (tx, rx) = slot::channel();
165 self._subscribe_stream(stream.map(move |iter| {
166 let mut lst = Vec::new();
167 for addr in iter {
168 match addr.into().parse(default_port) {
169 Ok(x) => lst.push(x),
170 Err(e) => {
171 warn!("Error parsing name: {}", e);
172 }
173 }
174 }
175 lst
176 }), tx);
177 AddrStream(rx)
178 }
179 pub fn subscribe_stream<S>(&self, stream: S, default_port: u16)
194 -> AddrStream
195 where S: Stream + Send + 'static,
196 S::Error: fmt::Display,
197 for<'x> S::Item: IntoNameIter<'x>,
198 {
199 let (tx, rx) = slot::channel();
200 self._subscribe_stream(stream.map(move |iter| {
201 let mut lst = Vec::new();
202 for addr in iter.into_name_iter() {
203 match addr.into().parse(default_port) {
204 Ok(x) => lst.push(x),
205 Err(e) => {
206 warn!("Error parsing name: {}", e);
207 }
209 }
210 }
211 lst
212 }), tx);
213 AddrStream(rx)
214 }
215
216 pub fn resolve_auto<'x, N: Into<AutoName<'x>>>(&self,
225 name: N, default_port: u16)
226 -> ResolveFuture
227 {
228 let (tx, rx) = oneshot::channel();
229 match name.into().parse(default_port) {
230 Ok(InternalName::HostPort(name, port)) => {
231 match self.requests.unbounded_send(
232 Request::ResolveHostPort(name.clone(), port, tx))
233 {
234 Ok(()) => {}
235 Err(e) => match e.into_inner() {
236 Request::ResolveHostPort(name, _, tx) => {
237 fail(&name, tx, Error::TemporaryError(
238 "Resolver is down".into()));
239 }
240 _ => unreachable!(),
241 }
242 }
243 }
244 Ok(InternalName::Service(name)) => {
245 match self.requests.unbounded_send(
246 Request::Resolve(name.clone(), tx))
247 {
248 Ok(()) => {}
249 Err(e) => match e.into_inner() {
250 Request::Resolve(name, tx) => {
251 fail(&name, tx, Error::TemporaryError(
252 "Resolver is down".into()));
253 }
254 _ => unreachable!(),
255 }
256 }
257 }
258 Ok(InternalName::Addr(addr)) => {
259 tx.send(Ok(addr.into())).ok();
260 }
261 Err(e) => {
262 tx.send(Err(e.into())).ok();
263 }
264 }
265 ResolveFuture(rx)
266 }
267
268}
269
270impl HostResolve for Router {
271 type HostFuture = ResolveHostFuture;
272 fn resolve_host(&self, name: &Name) -> ResolveHostFuture {
273 let (tx, rx) = oneshot::channel();
274 match self.requests.unbounded_send(
275 Request::ResolveHost(name.clone(), tx))
276 {
277 Ok(()) => {}
278 Err(e) => match e.into_inner() {
279 Request::ResolveHost(name, tx) => {
280 fail(&name, tx, Error::TemporaryError(
281 "Resolver is down".into()));
282 }
283 _ => unreachable!(),
284 }
285 }
286 ResolveHostFuture(rx)
287 }
288}
289
290impl Resolve for Router {
291 type Future = ResolveFuture;
292 fn resolve(&self, name: &Name) -> ResolveFuture {
293 let (tx, rx) = oneshot::channel();
294 match self.requests.unbounded_send(
295 Request::Resolve(name.clone(), tx))
296 {
297 Ok(()) => {}
298 Err(e) => match e.into_inner() {
299 Request::Resolve(name, tx) => {
300 fail(&name, tx, Error::TemporaryError(
301 "Resolver is down".into()));
302 }
303 _ => unreachable!(),
304 }
305 }
306 ResolveFuture(rx)
307 }
308
309}
310
311impl HostSubscribe for Router {
312 type HostError = Void;
313 type HostStream = HostStream;
314 fn subscribe_host(&self, name: &Name) -> HostStream {
315 let (tx, rx) = slot::channel();
316 self.requests.unbounded_send(
317 Request::HostSubscribe(name.clone(), tx))
318 .map_err(|_| debug!("Subscription for {} when resolver is down",
322 name))
323 .ok();
324 HostStream(rx)
325 }
326}
327
328impl Subscribe for Router {
329 type Error = Void;
330 type Stream = AddrStream;
331 fn subscribe(&self, name: &Name) -> AddrStream {
332 let (tx, rx) = slot::channel();
333 self.requests.unbounded_send(
334 Request::Subscribe(name.clone(), tx))
335 .map_err(|_| debug!("Subscription for {} when resolver is down",
339 name))
340 .ok();
341 AddrStream(rx)
342 }
343}
344
345#[cfg(test)]
346#[allow(dead_code)]
347mod type_test {
348 use std::sync::Arc;
349 use futures::Stream;
350 use name::AutoName;
351 use super::Router;
352
353 fn test_vec_string(r: &Router, v: Vec<String>) {
354 drop(r.subscribe_many(&v, 1));
355 }
356
357 fn test_vec_str(r: &Router, v: Vec<&str>) {
358 drop(r.subscribe_many(&v, 1));
359 }
360
361 fn test_vec_auto(r: &Router, v: Vec<AutoName>) {
362 drop(r.subscribe_many(v, 1));
363 }
364
365 fn test_map_auto(r: &Router, v: Vec<&str>) {
366 drop(r.subscribe_many(v.into_iter().map(AutoName::Auto), 1));
367 }
368
369 #[derive(Debug)]
370 enum MyName {
371 Auto(String),
372 Service(String),
373 }
374
375 #[derive(Debug)]
376 struct List1(Arc<Vec<MyName>>);
377
378 #[derive(Debug)]
379 struct List2(Arc<Vec<MyName>>);
380
381 impl<'a> IntoIterator for &'a List1 {
382 type Item = AutoName<'a>;
383 type IntoIter = ::std::iter::Map<::std::slice::Iter<'a, MyName>, fn(&'a MyName) -> AutoName<'a>>;
384 fn into_iter(self) -> Self::IntoIter {
385 self.0.iter().map(|x| x.into())
386 }
387 }
388
389 impl<'a> IntoIterator for &'a List2 {
390 type Item = &'a MyName;
391 type IntoIter = ::std::slice::Iter<'a, MyName>;
392 fn into_iter(self) -> Self::IntoIter {
393 self.0.iter()
394 }
395 }
396
397 impl<'a> Into<AutoName<'a>> for &'a MyName {
398 fn into(self) -> AutoName<'a> {
399 match *self {
400 MyName::Auto(ref x) => AutoName::Auto(x),
401 MyName::Service(ref x) => AutoName::Service(x),
402 }
403 }
404 }
405
406 fn test_subscribe_vec<S>(r: &Router, s: S)
407 where S: Stream<Item=Vec<String>, Error=String> + Send + 'static
408 {
409 drop(r.subscribe_stream(s, 80));
410 }
411
412 fn test_subscribe_vec_custom<S>(r: &Router, s: S)
413 where S: Stream<Item=Vec<MyName>, Error=String> + Send + 'static
414 {
415 drop(r.subscribe_stream(s, 80));
416 }
417
418 fn test_subscribe_arc_vec1<S>(r: &Router, s: S)
419 where S: Stream<Item=List1, Error=String> + Send + 'static
420 {
421 drop(r.subscribe_stream(s, 80));
422 }
423
424 fn test_subscribe_arc_vec2<S>(r: &Router, s: S)
425 where S: Stream<Item=List2, Error=String> + Send + 'static
426 {
427 drop(r.subscribe_stream(s, 80));
428 }
429}