rust_consul/
lib.rs

1//! Future-aware client for consul
2//!
3//! This library is an client for consul that gives you stream of changes
4//! done in consul
5
6#![deny(missing_docs, missing_debug_implementations, warnings)]
7
8extern crate hyper;
9extern crate hyper_tls;
10#[macro_use] extern crate log;
11extern crate futures;
12extern crate native_tls;
13extern crate serde;
14#[macro_use] extern crate serde_derive;
15extern crate serde_json;
16extern crate tokio;
17extern crate url;
18
19use std::error::{Error as StdError};
20use std::fmt::{self, Write};
21use std::io;
22use std::mem;
23use std::net::IpAddr;
24use std::num::ParseIntError;
25use std::str::FromStr;
26use std::time::{Duration, Instant};
27use std::marker::PhantomData;
28
29use serde_json::{from_slice, Error as JsonError, Value as JsonValue};
30use futures::{Stream, Future, Poll, Async};
31use futures::future::{empty as empty_future, Empty};
32use hyper::{Chunk, Body, StatusCode, Uri};
33use hyper::client::{Client as HttpClient, ResponseFuture, HttpConnector};
34use hyper::header::{CONTENT_TYPE, HeaderMap, HeaderValue};
35use hyper::error::{Error as HyperError};
36use hyper_tls::HttpsConnector;
37use native_tls::{Error as TlsError};
38use tokio::reactor::Handle;
39use tokio::timer::Timeout;
40use url::{Url, ParseError as UrlParseError};
41use native_tls::TlsConnector;
42
43type Headers = HeaderMap<HeaderValue>;
44
45/// General errors that breaks the stream
46#[derive(Debug)]
47pub enum Error {
48    /// Error given internaly by hyper
49    Http(HyperError),
50    /// You have polled the watcher from two different threads
51    InvalidState,
52    /// You have given us an invalid url
53    InvalidUrl(UrlParseError),
54    /// Error while initializing tls
55    Tls(TlsError),
56    /// uncatched io error
57    Io(io::Error),
58    /// consul response failed to parse
59    BodyParse(ParseError),
60}
61
62impl fmt::Display for Error {
63    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
64        match *self {
65            Error::Http(ref he) => write!(f, "http error: {}", he),
66            Error::InvalidState => write!(f, "invalid state reached"),
67            Error::InvalidUrl(ref pe) => write!(f, "invalid url: {}", pe),
68            Error::Tls(ref te) => write!(f, "{}", te),
69            Error::Io(ref ie) => write!(f, "{}", ie),
70            Error::BodyParse(ref be) => write!(f, "{}", be),
71        }
72    }
73}
74
75impl StdError for Error {
76    fn description(&self) -> &str {
77        match *self {
78            Error::Http(_) => "http error",
79            Error::InvalidState => "invalid state reached",
80            Error::InvalidUrl(_) => "invalid url",
81            Error::Tls(_) => "Tls initialization problem",
82            Error::Io(_) => "io problem",
83            Error::BodyParse(_) => "body parse problem",
84        }
85    }
86}
87
88impl From<UrlParseError> for Error {
89    fn from(e: UrlParseError) -> Error {
90        Error::InvalidUrl(e)
91    }
92}
93
94impl From<TlsError> for Error {
95    fn from(e: TlsError) -> Error {
96        Error::Tls(e)
97    }
98}
99
100impl From<HyperError> for Error {
101    fn from(e: HyperError) -> Error {
102        Error::Http(e)
103    }
104}
105
106impl From<io::Error> for Error {
107    fn from(e: io::Error) -> Error {
108        Error::Io(e)
109    }
110}
111
112impl From<ParseError> for Error {
113    fn from(e: ParseError) -> Error {
114        Error::BodyParse(e)
115    }
116}
117
118/// Errors related to blocking protocol as defined by consul
119#[derive(Debug, Copy, Clone)]
120pub enum ProtocolError {
121    /// Consul did not reply with X-Consul-Index header
122    BlockingMissing,
123    /// Consul did not reply with Content-Type: application/json
124    ContentTypeNotJson,
125    /// Consul did not reply with 200 Ok status
126    NonOkResult(StatusCode),
127    /// connection refused to consul
128    ConnectionRefused,
129    /// we had an error, and consumer resetted the stream
130    StreamRestarted,
131}
132
133impl fmt::Display for ProtocolError {
134    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
135        match *self {
136            ProtocolError::BlockingMissing => write!(f, "{}", self.description()),
137            ProtocolError::ContentTypeNotJson => write!(f, "{}", self.description()),
138            ProtocolError::NonOkResult(ref status) => write!(f, "Non ok result from consul: {}", status),
139            ProtocolError::ConnectionRefused => write!(f, "connection refused to consul"),
140            ProtocolError::StreamRestarted => write!(f, "consumer restarted the stream"),
141        }
142    }
143}
144
145impl StdError for ProtocolError {
146    fn description(&self) -> &str {
147        match *self {
148            ProtocolError::BlockingMissing => "X-Consul-Index missing from response",
149            ProtocolError::ContentTypeNotJson => "Consul replied with a non-json content",
150            ProtocolError::NonOkResult(_) => "Non ok result from consul",
151            ProtocolError::ConnectionRefused => "connection refused to consul",
152            ProtocolError::StreamRestarted => "consumer restarted the stream",
153        }
154    }
155}
156
157/// Error that Watch may yield *in the stream*
158#[derive(Debug)]
159pub enum ParseError {
160    /// Consul protocol error (missing header, unknown return format)
161    Protocol(ProtocolError),
162    /// Json result does not fit expected format
163    UnexpectedJsonFormat,
164    /// The data is not in json format
165    BodyParsing(JsonError),
166}
167
168impl fmt::Display for ParseError {
169    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
170        match *self {
171            ParseError::Protocol(ref pe) => write!(f, "Protocol error: {}", pe),
172            ParseError::UnexpectedJsonFormat => write!(f, "{}", self.description()),
173            ParseError::BodyParsing(ref je) => write!(f, "Data not in json format: {}", je),
174        }
175    }
176}
177
178impl StdError for ParseError {
179    fn description(&self) -> &str {
180        match *self {
181            ParseError::Protocol(_) => "Protocol error",
182            ParseError::UnexpectedJsonFormat => "Unexpected json format",
183            ParseError::BodyParsing(_) => "Data not in json format",
184        }
185    }
186}
187
188impl From<ProtocolError> for ParseError {
189    fn from(e: ProtocolError) -> ParseError {
190        ParseError::Protocol(e)
191    }
192}
193
194#[derive(Clone, Copy, Debug)]
195struct Blocking {
196    index: u64,
197}
198
199impl Blocking {
200    fn from(headers: &HeaderMap<HeaderValue>) -> Result<Self, ()> {
201        let raw_header: Result<&HeaderValue, ()> = headers.get("X-Consul-Index")
202            .ok_or(());
203        raw_header
204            .and_then(|res| res.to_str().map_err(|_| ()))
205            .and_then(|res| Self::from_str(res).map_err(|_| ()))
206    }
207
208    fn to_string(&self) -> String {
209        let mut out = String::new();
210        let _ = write!(out, "{}", self.index);
211        out
212    }
213
214    fn add_to_uri(&self, uri: &Url) -> Url {
215        let mut uri = uri.clone();
216        uri.query_pairs_mut()
217            .append_pair("index", self.to_string().as_str())
218
219            .finish();
220        uri
221    }
222}
223
224impl Default for Blocking {
225    fn default() -> Blocking {
226        Blocking {
227            index: 0,
228        }
229    }
230}
231
232//impl Header for Blocking {
233//    fn header_name() -> &'static str {
234//        static NAME: &'static str = "X-Consul-Index";
235//        NAME
236//    }
237//
238//    fn parse_header(raw: &Raw) -> HyperResult<Self> {
239//        from_one_raw_str(raw)
240//    }
241//
242//    fn fmt_header(&self, f: &mut HyperFormatter) -> fmt::Result {
243//        f.fmt_line(self)
244//    }
245//}
246
247impl FromStr for Blocking {
248    type Err = ParseIntError;
249    fn from_str(s: &str) -> Result<Self, Self::Err> {
250        let index = s.parse::<u64>()?;
251        Ok(Blocking {
252            index
253        })
254    }
255}
256
257impl fmt::Display for Blocking {
258    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
259        write!(f, "{}", self.index)
260    }
261}
262
263#[derive(Debug)]
264struct BodyBuffer {
265    inner: Body,
266    buffer: Chunk,
267}
268
269impl BodyBuffer {
270    fn new(inner: Body) -> BodyBuffer {
271        BodyBuffer {
272            inner,
273            buffer: Chunk::default(),
274        }
275    }
276}
277
278impl Future for BodyBuffer {
279    type Item = Chunk;
280    type Error = Error;
281
282    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
283        trace!("polling BodyBuffer");
284        loop {
285            match self.inner.poll() {
286                Ok(Async::NotReady) => return Ok(Async::NotReady),
287                Ok(Async::Ready(None)) => {
288                    let buffer = mem::replace(&mut self.buffer, Chunk::default());
289
290                    return Ok(Async::Ready(buffer));
291                },
292                Ok(Async::Ready(Some(data))) => {
293                    self.buffer.extend(data);
294                    // loop, see if there is any more data here
295                },
296                Err(e) => return Err(Error::Http(e)),
297            }
298        }
299    }
300}
301
302/// Consul client
303#[derive(Debug, Clone)]
304pub struct Client {
305    http_client: HttpClient<HttpsConnector<HttpConnector>>,
306    base_uri: Url,
307    handle: Handle,
308}
309
310impl Client {
311    /// Allocate a new consul client
312    pub fn new(base_uri: &str, handle: &Handle) -> Result<Client, Error> {
313        let base_uri = Url::parse(base_uri)?;
314        let threads = 4;
315
316        let mut http = HttpConnector::new(threads);
317        http.enforce_http(false);
318        http.set_reactor(Some(handle.clone()));
319        let tls = TlsConnector::builder().build()?;
320
321        let connector = HttpsConnector::from((http, tls));
322
323        let http_client = HttpClient::builder()
324            .keep_alive(true)
325            .build(connector);
326
327        Ok(Client{
328            http_client,
329            base_uri,
330            handle: handle.clone(),
331        })
332    }
333
334    /// List services in the kernel and watch them
335    pub fn services(&self) -> Watcher<Services> {
336        let mut base_uri = self.base_uri.clone();
337        base_uri.set_path("/v1/catalog/services");
338
339        Watcher{
340            state: WatcherState::Init{
341                base_uri,
342                client: self.clone(),
343                error_strategy: ErrorStrategy::default(),
344            },
345            phantom: PhantomData::<Services>
346        }
347    }
348
349    /// Watch changes of nodes on a service
350    pub fn watch_service(&self, name: &str, passing: bool) -> Watcher<ServiceNodes> {
351        let mut base_uri = self.base_uri.clone();
352        base_uri.set_path("/v1/health/service/");
353        let mut base_uri = base_uri.join(name).unwrap();
354        if passing {
355            base_uri.query_pairs_mut()
356                .append_pair("passing", "true")
357                .finish();
358        }
359
360        Watcher{
361            state: WatcherState::Init{
362                base_uri,
363                client: self.clone(),
364                error_strategy: ErrorStrategy::default(),
365            },
366            phantom: PhantomData::<ServiceNodes>
367        }
368    }
369
370    /// Get agent informations
371    pub fn agent(&self) -> FutureConsul<Agent> {
372        let mut base_uri = self.base_uri.clone();
373        base_uri.set_path("/v1/agent/self");
374
375        FutureConsul{
376            state: FutureState::Init{
377                base_uri,
378                client: self.clone(),
379            },
380            phantom: PhantomData::<Agent>
381        }
382    }
383}
384
385#[derive(Debug)]
386enum ErrorHandling {
387    RetryBackoff,
388}
389
390#[derive(Debug)]
391struct ErrorStrategy {
392    request_timeout: Duration,
393    on_error: ErrorHandling,
394}
395
396impl Default for ErrorStrategy {
397    fn default() -> ErrorStrategy {
398        ErrorStrategy {
399            request_timeout: Duration::new(5, 0),
400            on_error: ErrorHandling::RetryBackoff,
401        }
402    }
403}
404
405#[derive(Debug)]
406struct ErrorState{
407    strategy: ErrorStrategy,
408    current_retries: u64,
409    last_try: Option<Instant>,
410    last_contact: Option<Instant>,
411    last_ok: Option<Instant>,
412    last_error: Option<ProtocolError>,
413}
414
415impl ErrorState {
416    pub fn next_timeout(&self) -> DebugTimeout {
417        let retries = if self.current_retries > 10 {
418            10
419        } else {
420            self.current_retries
421        };
422
423        debug!("Will sleep for {} seconds and retry", retries);
424        let duration = Duration::new(retries, 0);
425
426        DebugTimeout::new(duration)
427    }
428}
429
430impl From<ErrorStrategy> for ErrorState {
431    fn from(strategy: ErrorStrategy) -> ErrorState {
432        ErrorState {
433            strategy,
434            current_retries: 0,
435            last_try: None,
436            last_contact: None,
437            last_ok: None,
438            last_error: None,
439        }
440    }
441}
442
443struct DebugTimeout(Timeout<Empty<(), io::Error>>);
444
445impl DebugTimeout {
446    pub fn new(duration: Duration) -> Self {
447        DebugTimeout(Timeout::new(empty_future(), duration))
448    }
449}
450
451impl fmt::Debug for DebugTimeout {
452    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
453        write!(f, "Timeout")
454    }
455}
456
457impl Future for DebugTimeout {
458    type Item = ();
459    type Error = io::Error;
460
461    #[inline]
462    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
463        trace!("Timeout::poll() called");
464        let res = self.0.poll();
465
466        trace!("res {:?}", res);
467        match res {
468            Err(err) => match err.into_inner() {
469                Some(err) => Err(err),
470                None => Ok(Async::Ready(())),
471            },
472            Ok(ready) => Ok(ready),
473        }
474    }
475}
476
477fn url_to_uri(uri: &Url) -> Uri {
478    let out = Uri::from_str(uri.as_str());
479    if out.is_err() {
480        error!("url malformed: {:?}", uri);
481    }
482
483    // TODO: meh unwrap()
484    out.unwrap()
485}
486
487#[derive(Debug)]
488enum WatcherState{
489    Init{
490        base_uri: Url,
491        client: Client,
492        error_strategy: ErrorStrategy,
493    },
494    Completed {
495        base_uri: Url,
496        client: Client,
497        error_state: ErrorState,
498        blocking: Blocking,
499    },
500    Error {
501        base_uri: Url,
502        client: Client,
503        blocking: Blocking,
504        error_state: ErrorState,
505        retry: Option<DebugTimeout>,
506    },
507    PendingHeaders {
508        base_uri: Url,
509        client: Client,
510        error_state: ErrorState,
511        request: ResponseFuture,
512        blocking: Blocking,
513    },
514    PendingBody {
515        base_uri: Url,
516        client: Client,
517        error_state: ErrorState,
518        blocking: Blocking,
519        headers: Headers,
520        body: BodyBuffer,
521    },
522    Working,
523}
524
525impl Stream for WatcherState {
526    type Item = Result<Chunk, ProtocolError>;
527    type Error = Error;
528
529    fn poll(&mut self) -> Poll<Option<Self::Item>, <WatcherState as Stream>::Error> {
530        trace!("polling WatcherState");
531        loop {
532            match mem::replace(self, WatcherState::Working) {
533                WatcherState::Init{base_uri, client, error_strategy} => {
534                    trace!("querying uri: {}", base_uri);
535
536                    let request = client.http_client.get(url_to_uri(&base_uri));
537                    trace!("{}: no response for now => PendingHeader", base_uri);
538                    *self = WatcherState::PendingHeaders {
539                        base_uri,
540                        client,
541                        request,
542                        error_state: error_strategy.into(),
543                        blocking: Blocking::default(),
544                    };
545                },
546                WatcherState::Completed{base_uri, client, blocking, mut error_state} => {
547                    let uri = blocking.add_to_uri(&base_uri);
548                    trace!("querying uri: {}", uri);
549
550                    error_state.last_try = Some(Instant::now());
551
552                    let request = client.http_client.get(url_to_uri(&uri));
553                    trace!("{}: no response for now => PendingHeader", base_uri);
554                    *self = WatcherState::PendingHeaders {
555                        base_uri,
556                        client,
557                        request,
558                        blocking,
559                        error_state,
560                    };
561                },
562                WatcherState::PendingHeaders{base_uri, client, blocking, mut request, mut error_state} => {
563                    trace!("{}: polling headers", base_uri);
564
565                    match request.poll() {
566                        Err(e) => {
567                            if e.is_connect() {
568                                warn!("{}: got io error: {}", base_uri, e);
569                                let err = ProtocolError::ConnectionRefused;
570                                error_state.last_error = Some(err);
571                                error_state.current_retries += 1;
572                                *self = WatcherState::Error{
573                                     base_uri,
574                                     client,
575                                     blocking,
576                                     error_state,
577                                     retry: None,
578                                };
579                                return Ok(Async::Ready(Some(Err(err))));
580                            } else {
581                                error!("{}: got error, stopping: {}", base_uri, e);
582                                return Err(e.into());
583                            }
584                        },
585                        Ok(Async::Ready(response_headers)) => {
586                            let status = response_headers.status();
587                            let headers = response_headers.headers().clone();
588                            let response_has_json_content_type = headers.get(CONTENT_TYPE).map(|h| h.eq("application/json")).unwrap_or(false);
589                            error_state.last_contact = Some(Instant::now());
590
591                            if status != StatusCode::OK {
592                                warn!("{}: got non-200 status: {}", base_uri, status);
593                                let err = ProtocolError::NonOkResult(status);
594                                error_state.last_error = Some(err);
595                                error_state.current_retries += 1;
596                                *self = WatcherState::Error{
597                                     base_uri,
598                                     client,
599                                     blocking,
600                                     error_state,
601                                     retry: None,
602                                };
603                                return Ok(Async::Ready(Some(Err(err))))
604                            }
605
606
607                            if !response_has_json_content_type {
608                                warn!("{}: got non-json content: {:?}", base_uri, headers);
609                                error_state.last_error = Some(ProtocolError::ContentTypeNotJson);
610                                *self = WatcherState::Error{
611                                     base_uri,
612                                     client,
613                                     blocking,
614                                     error_state,
615                                     retry: None,
616                                };
617                            } else {
618
619                                trace!("{}: got headers {} {:?} => PendingBody", base_uri, status, headers);
620                                let body = BodyBuffer::new(response_headers.into_body());
621
622                                *self = WatcherState::PendingBody {
623                                    base_uri,
624                                    client,
625                                    blocking,
626                                    headers,
627                                    body,
628                                    error_state,
629                                };
630                            };
631                        },
632                        Ok(Async::NotReady) => {
633                            trace!("{}: still no headers => PendingHeaders", base_uri);
634                            *self = WatcherState::PendingHeaders {
635                                base_uri,
636                                client,
637                                blocking,
638                                request,
639                                error_state,
640                            };
641                            return Ok(Async::NotReady);
642                        }
643                    }
644                },
645                WatcherState::PendingBody{base_uri, client, blocking, headers, mut body, mut error_state} => {
646                    trace!("{}: polling body", base_uri);
647
648                    if let Async::Ready(body) = body.poll()? {
649                        debug!("{}: got content: {:?}", base_uri, body);
650                        let new_blocking = Blocking::from(&headers).map_err(|_| ProtocolError::BlockingMissing);
651                        match new_blocking {
652                            Err(err) => {
653                                error!("{}: got error while parsing blocking headers: {:?}, {:?}", base_uri, headers, err);
654                                error_state.last_error = Some(err);
655                                error_state.current_retries += 1;
656                                *self = WatcherState::Error{
657                                     base_uri,
658                                     client,
659                                     error_state,
660                                     blocking,
661
662                                     // The next call to poll() will start the
663                                     // timer (don't generate a timer ifclient
664                                     // does not need)
665                                     retry: None,
666                                };
667                                return Ok(Async::Ready(Some(Err(err))));
668                            },
669                            Ok(blocking) => {
670                                info!("{}: got blocking headers: {}", base_uri, blocking);
671                                error_state.last_ok = Some(Instant::now());
672                                error_state.last_error = None;
673                                error_state.current_retries = 0;
674
675                                *self = WatcherState::Completed{
676                                    base_uri,
677                                    client,
678                                    blocking,
679                                    error_state
680                                };
681
682                                return Ok(Async::Ready(Some(Ok(body))));
683                            }
684                        }
685                    } else {
686                        trace!("{}: still no body => PendingBody", base_uri);
687
688                        *self = WatcherState::PendingBody {
689                            base_uri,
690                            client,
691                            headers,
692                            blocking,
693                            body,
694                            error_state,
695                        };
696                        return Ok(Async::NotReady);
697                    }
698                },
699
700                WatcherState::Error{base_uri, client, blocking, error_state, retry} => {
701                    trace!("{}: still no body => PendingBody", base_uri);
702                    if let Some(mut retry) = retry {
703                        // We have a timeout loaded, see if it resolved
704                        if let Async::Ready(_) = retry.poll()? {
705                            trace!("{}: timeout completed", base_uri);
706                            *self = WatcherState::Completed{
707                                base_uri,
708                                client,
709                                blocking,
710                                error_state
711                            };
712                        } else {
713                            trace!("{}: timeout not completed", base_uri);
714                            *self = WatcherState::Error{
715                                base_uri,
716                                client,
717                                blocking,
718                                error_state,
719                                retry: Some(retry)
720                            };
721                            return Ok(Async::NotReady);
722                        }
723                    } else {
724                        let next_timeout = error_state.next_timeout();
725                        trace!("{}: setting timeout", base_uri);
726                        *self = WatcherState::Error{
727                            base_uri,
728                            client,
729                            blocking,
730                            error_state,
731                            retry: Some(next_timeout),
732                        };
733                        // loop will consume the poll
734                    }
735                }
736
737                // Dead end
738                WatcherState::Working => {
739                    error!("watcher in working state, weird");
740                    return Err(Error::InvalidState);
741                },
742            }
743        }
744    }
745}
746
747/// Watch changes made in consul and parse those changes
748#[derive(Debug)]
749pub struct Watcher<T>{
750    state: WatcherState,
751    phantom: PhantomData<T>,
752}
753
754impl<T> Watcher<T> {
755    /// Whenever the stream yield an error. The stream closes and
756    /// can't be consumed anymore. In such cases, you are required to reset
757    /// the stream. It will then, sleep (according to the error strategy)
758    /// and reconnect to consul.
759    pub fn reset(&mut self) {
760        let (base_uri, client, blocking, mut error_state) = match mem::replace(&mut self.state, WatcherState::Working) {
761            WatcherState::Init{base_uri, client, error_strategy, ..} =>
762                (base_uri, client, Blocking::default(), ErrorState::from(error_strategy)),
763            WatcherState::Completed{base_uri, client, blocking, error_state, ..} |
764            WatcherState::Error{base_uri, client, blocking, error_state, ..} |
765            WatcherState::PendingHeaders{base_uri, client, blocking, error_state, ..} |
766            WatcherState::PendingBody{base_uri, client, blocking, error_state, ..} =>
767                (base_uri, client, blocking, error_state),
768            WatcherState::Working => panic!("stream resetted while polled. State is invalid"),
769        };
770        error_state.last_error = Some(ProtocolError::StreamRestarted);
771        self.state = WatcherState::Error{
772             base_uri,
773             client,
774             blocking,
775             error_state,
776             retry: None,
777        };
778    }
779}
780
781impl<T> Stream for Watcher<T>
782    where T: ConsulType {
783    type Item = Result<T::Reply, ParseError>;
784    type Error = Error;
785
786    #[inline]
787    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
788        // poll()? pattern will bubble up the error
789        match self.state.poll()? {
790            Async::NotReady => Ok(Async::NotReady),
791            Async::Ready(None) => Ok(Async::Ready(None)),
792            Async::Ready(Some(Err(e))) => Ok(Async::Ready(Some(Err(e.into())))),
793            Async::Ready(Some(Ok(body))) => {
794                Ok(Async::Ready(Some(T::parse(&body))))
795            }
796        }
797    }
798}
799
800
801/// Trait for parsing types out of consul
802pub trait ConsulType {
803    /// The kind of replies this parser yields
804    type Reply;
805
806    /// Parse an http body and give back a result
807    fn parse(buf: &Chunk) -> Result<Self::Reply, ParseError>;
808}
809
810fn read_map_service_name(value: &JsonValue) -> Result<Vec<ServiceName>, ParseError> {
811    if let &JsonValue::Object(ref map) = value {
812        let mut out = Vec::with_capacity(map.len());
813        for (k, v) in map.iter() {
814            if let &JsonValue::Array(ref _values) = v {
815                if k != "consul" {
816                    out.push(k.clone());
817                }
818            } else {
819                return Err(ParseError::UnexpectedJsonFormat)
820            }
821        }
822        Ok(out)
823    } else {
824        Err(ParseError::UnexpectedJsonFormat)
825    }
826}
827
828/// Services name used in consul
829pub type ServiceName = String;
830
831/// Parse services list in consul
832#[derive(Debug)]
833pub struct Services {}
834impl ConsulType for Services {
835    type Reply = Vec<ServiceName>;
836
837    fn parse(buf: &Chunk) -> Result<Self::Reply, ParseError> {
838         let v: JsonValue = from_slice(&buf).map_err(ParseError::BodyParsing)?;
839         let res = read_map_service_name(&v)?;
840
841         Ok(res)
842    }
843}
844
845/// Parse node list from services in consul
846#[derive(Debug)]
847pub struct ServiceNodes {}
848impl ConsulType for ServiceNodes {
849    type Reply = Vec<Node>;
850
851    fn parse(buf: &Chunk) -> Result<Self::Reply, ParseError> {
852         let v: Vec<TempNode> = from_slice(&buf).map_err(ParseError::BodyParsing)?;
853
854         Ok(v.into_iter().map(|x| x.node).collect())
855    }
856}
857
858#[derive(Deserialize)]
859struct TempNode {
860    #[serde(rename = "Node")]
861    node: Node,
862}
863
864/// Node hosting services
865#[derive(Debug, Deserialize, PartialEq, Clone)]
866pub struct Node {
867    /// Node name
868    #[serde(rename = "Node")]
869    pub name: String,
870
871    /// Node address
872    #[serde(rename = "Address")]
873    pub address: IpAddr,
874}
875
876/// A future response from consul
877#[derive(Debug)]
878pub struct FutureConsul<T> {
879    state: FutureState,
880    phantom: PhantomData<T>,
881}
882
883impl<T> Future for FutureConsul<T>
884    where T: ConsulType {
885    type Item = T::Reply;
886    type Error = Error;
887
888    #[inline]
889    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
890        // poll()? pattern will bubble up the error
891        match self.state.poll()? {
892            Async::NotReady => Ok(Async::NotReady),
893            Async::Ready(body) => {
894                T::parse(&body).map(|res| {
895                   Async::Ready(res)
896                }).map_err(|e| Error::BodyParse(e))
897            },
898        }
899    }
900}
901
902#[derive(Debug)]
903enum FutureState {
904    Init {
905        base_uri: Url,
906        client: Client,
907    },
908    PendingHeaders {
909        base_uri: Url,
910        client: Client,
911        request: ResponseFuture,
912    },
913    PendingBody {
914        base_uri: Url,
915        client: Client,
916        headers: Headers,
917        body: BodyBuffer,
918    },
919    Done,
920    Working,
921}
922
923impl Future for FutureState {
924    type Item = Chunk;
925    type Error = Error;
926
927    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
928        trace!("polling FutureState");
929        loop {
930            match mem::replace(self, FutureState::Working) {
931                FutureState::Init{base_uri, client} => {
932                    trace!("querying uri: {}", base_uri);
933
934                    let request = client.http_client.get(url_to_uri(&base_uri));
935                    trace!("no response for now => PendingHeader");
936                    *self = FutureState::PendingHeaders {
937                        base_uri,
938                        client,
939                        request,
940                    };
941                },
942                FutureState::PendingHeaders{base_uri, client, mut request} => {
943                    trace!("polling headers");
944
945                    match request.poll()? {
946                        Async::Ready(response_headers) => {
947                            let status = response_headers.status();
948                            let headers = response_headers.headers().clone();
949                            let response_has_json_content_type = headers.get(CONTENT_TYPE).map(|h| h.eq("application/json")).unwrap_or(false);
950                            if status != StatusCode::OK {
951                                let err = ProtocolError::NonOkResult(status);
952                                return Err(Error::BodyParse(ParseError::Protocol(err)))
953                            } else if !response_has_json_content_type {
954                                let err = ProtocolError::ContentTypeNotJson;
955                                return Err(Error::BodyParse(ParseError::Protocol(err)))
956                            } else {
957                                trace!("got headers {} {:?} => PendingBody", status, headers);
958                                let body = BodyBuffer::new(response_headers.into_body());
959                                *self = FutureState::PendingBody {
960                                    base_uri,
961                                    client,
962                                    headers,
963                                    body,
964                                };
965                            }
966                        },
967                        Async::NotReady => {
968                            trace!("still no headers => PendingHeaders");
969                            *self = FutureState::PendingHeaders {
970                                base_uri,
971                                client,
972                                request,
973                            };
974                            return Ok(Async::NotReady);
975                        },
976                    }
977                },
978                FutureState::PendingBody{base_uri, client, headers, mut body} => {
979                    trace!("polling body");
980
981                    if let Async::Ready(body) = body.poll()? {
982                        *self = FutureState::Done;
983                        return Ok(Async::Ready(body));
984                    } else {
985                        *self = FutureState::PendingBody{
986                            base_uri,
987                            client,
988                            headers,
989                            body
990                        };
991                        return Ok(Async::NotReady);
992                    }
993                }
994
995                // Dead end
996                FutureState::Working | FutureState::Done => {
997                    return Err(Error::InvalidState);
998                },
999            }
1000        }
1001    }
1002}
1003
1004#[derive(Deserialize)]
1005struct InnerAgent {
1006    #[serde(rename = "Member")]
1007    member: InnerMember,
1008}
1009
1010#[derive(Deserialize)]
1011struct InnerMember {
1012    #[serde(rename = "Addr")]
1013    addr: IpAddr,
1014}
1015
1016/// Parse node list from services in consul
1017#[derive(Debug)]
1018pub struct Agent {
1019    /// public ip address used by this address
1020    pub member_address: IpAddr,
1021}
1022
1023impl ConsulType for Agent {
1024    type Reply = Agent;
1025
1026    fn parse(buf: &Chunk) -> Result<Self::Reply, ParseError> {
1027        let agent: InnerAgent = serde_json::from_slice(&buf).map_err(ParseError::BodyParsing)?;
1028        Ok(Agent {
1029            member_address: agent.member.addr,
1030        })
1031    }
1032}
1033