1#![deny(missing_docs, missing_debug_implementations, warnings)]
7
8extern crate hyper;
9extern crate hyper_tls;
10#[macro_use] extern crate log;
11extern crate futures;
12extern crate native_tls;
13extern crate serde;
14#[macro_use] extern crate serde_derive;
15extern crate serde_json;
16extern crate tokio;
17extern crate url;
18
19use std::error::{Error as StdError};
20use std::fmt::{self, Write};
21use std::io;
22use std::mem;
23use std::net::IpAddr;
24use std::num::ParseIntError;
25use std::str::FromStr;
26use std::time::{Duration, Instant};
27use std::marker::PhantomData;
28
29use serde_json::{from_slice, Error as JsonError, Value as JsonValue};
30use futures::{Stream, Future, Poll, Async};
31use futures::future::{empty as empty_future, Empty};
32use hyper::{Chunk, Body, StatusCode, Uri};
33use hyper::client::{Client as HttpClient, ResponseFuture, HttpConnector};
34use hyper::header::{CONTENT_TYPE, HeaderMap, HeaderValue};
35use hyper::error::{Error as HyperError};
36use hyper_tls::HttpsConnector;
37use native_tls::{Error as TlsError};
38use tokio::reactor::Handle;
39use tokio::timer::Timeout;
40use url::{Url, ParseError as UrlParseError};
41use native_tls::TlsConnector;
42
43type Headers = HeaderMap<HeaderValue>;
44
45#[derive(Debug)]
47pub enum Error {
48 Http(HyperError),
50 InvalidState,
52 InvalidUrl(UrlParseError),
54 Tls(TlsError),
56 Io(io::Error),
58 BodyParse(ParseError),
60}
61
62impl fmt::Display for Error {
63 fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
64 match *self {
65 Error::Http(ref he) => write!(f, "http error: {}", he),
66 Error::InvalidState => write!(f, "invalid state reached"),
67 Error::InvalidUrl(ref pe) => write!(f, "invalid url: {}", pe),
68 Error::Tls(ref te) => write!(f, "{}", te),
69 Error::Io(ref ie) => write!(f, "{}", ie),
70 Error::BodyParse(ref be) => write!(f, "{}", be),
71 }
72 }
73}
74
75impl StdError for Error {
76 fn description(&self) -> &str {
77 match *self {
78 Error::Http(_) => "http error",
79 Error::InvalidState => "invalid state reached",
80 Error::InvalidUrl(_) => "invalid url",
81 Error::Tls(_) => "Tls initialization problem",
82 Error::Io(_) => "io problem",
83 Error::BodyParse(_) => "body parse problem",
84 }
85 }
86}
87
88impl From<UrlParseError> for Error {
89 fn from(e: UrlParseError) -> Error {
90 Error::InvalidUrl(e)
91 }
92}
93
94impl From<TlsError> for Error {
95 fn from(e: TlsError) -> Error {
96 Error::Tls(e)
97 }
98}
99
100impl From<HyperError> for Error {
101 fn from(e: HyperError) -> Error {
102 Error::Http(e)
103 }
104}
105
106impl From<io::Error> for Error {
107 fn from(e: io::Error) -> Error {
108 Error::Io(e)
109 }
110}
111
112impl From<ParseError> for Error {
113 fn from(e: ParseError) -> Error {
114 Error::BodyParse(e)
115 }
116}
117
118#[derive(Debug, Copy, Clone)]
120pub enum ProtocolError {
121 BlockingMissing,
123 ContentTypeNotJson,
125 NonOkResult(StatusCode),
127 ConnectionRefused,
129 StreamRestarted,
131}
132
133impl fmt::Display for ProtocolError {
134 fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
135 match *self {
136 ProtocolError::BlockingMissing => write!(f, "{}", self.description()),
137 ProtocolError::ContentTypeNotJson => write!(f, "{}", self.description()),
138 ProtocolError::NonOkResult(ref status) => write!(f, "Non ok result from consul: {}", status),
139 ProtocolError::ConnectionRefused => write!(f, "connection refused to consul"),
140 ProtocolError::StreamRestarted => write!(f, "consumer restarted the stream"),
141 }
142 }
143}
144
145impl StdError for ProtocolError {
146 fn description(&self) -> &str {
147 match *self {
148 ProtocolError::BlockingMissing => "X-Consul-Index missing from response",
149 ProtocolError::ContentTypeNotJson => "Consul replied with a non-json content",
150 ProtocolError::NonOkResult(_) => "Non ok result from consul",
151 ProtocolError::ConnectionRefused => "connection refused to consul",
152 ProtocolError::StreamRestarted => "consumer restarted the stream",
153 }
154 }
155}
156
157#[derive(Debug)]
159pub enum ParseError {
160 Protocol(ProtocolError),
162 UnexpectedJsonFormat,
164 BodyParsing(JsonError),
166}
167
168impl fmt::Display for ParseError {
169 fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
170 match *self {
171 ParseError::Protocol(ref pe) => write!(f, "Protocol error: {}", pe),
172 ParseError::UnexpectedJsonFormat => write!(f, "{}", self.description()),
173 ParseError::BodyParsing(ref je) => write!(f, "Data not in json format: {}", je),
174 }
175 }
176}
177
178impl StdError for ParseError {
179 fn description(&self) -> &str {
180 match *self {
181 ParseError::Protocol(_) => "Protocol error",
182 ParseError::UnexpectedJsonFormat => "Unexpected json format",
183 ParseError::BodyParsing(_) => "Data not in json format",
184 }
185 }
186}
187
188impl From<ProtocolError> for ParseError {
189 fn from(e: ProtocolError) -> ParseError {
190 ParseError::Protocol(e)
191 }
192}
193
194#[derive(Clone, Copy, Debug)]
195struct Blocking {
196 index: u64,
197}
198
199impl Blocking {
200 fn from(headers: &HeaderMap<HeaderValue>) -> Result<Self, ()> {
201 let raw_header: Result<&HeaderValue, ()> = headers.get("X-Consul-Index")
202 .ok_or(());
203 raw_header
204 .and_then(|res| res.to_str().map_err(|_| ()))
205 .and_then(|res| Self::from_str(res).map_err(|_| ()))
206 }
207
208 fn to_string(&self) -> String {
209 let mut out = String::new();
210 let _ = write!(out, "{}", self.index);
211 out
212 }
213
214 fn add_to_uri(&self, uri: &Url) -> Url {
215 let mut uri = uri.clone();
216 uri.query_pairs_mut()
217 .append_pair("index", self.to_string().as_str())
218
219 .finish();
220 uri
221 }
222}
223
224impl Default for Blocking {
225 fn default() -> Blocking {
226 Blocking {
227 index: 0,
228 }
229 }
230}
231
232impl FromStr for Blocking {
248 type Err = ParseIntError;
249 fn from_str(s: &str) -> Result<Self, Self::Err> {
250 let index = s.parse::<u64>()?;
251 Ok(Blocking {
252 index
253 })
254 }
255}
256
257impl fmt::Display for Blocking {
258 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
259 write!(f, "{}", self.index)
260 }
261}
262
263#[derive(Debug)]
264struct BodyBuffer {
265 inner: Body,
266 buffer: Chunk,
267}
268
269impl BodyBuffer {
270 fn new(inner: Body) -> BodyBuffer {
271 BodyBuffer {
272 inner,
273 buffer: Chunk::default(),
274 }
275 }
276}
277
278impl Future for BodyBuffer {
279 type Item = Chunk;
280 type Error = Error;
281
282 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
283 trace!("polling BodyBuffer");
284 loop {
285 match self.inner.poll() {
286 Ok(Async::NotReady) => return Ok(Async::NotReady),
287 Ok(Async::Ready(None)) => {
288 let buffer = mem::replace(&mut self.buffer, Chunk::default());
289
290 return Ok(Async::Ready(buffer));
291 },
292 Ok(Async::Ready(Some(data))) => {
293 self.buffer.extend(data);
294 },
296 Err(e) => return Err(Error::Http(e)),
297 }
298 }
299 }
300}
301
302#[derive(Debug, Clone)]
304pub struct Client {
305 http_client: HttpClient<HttpsConnector<HttpConnector>>,
306 base_uri: Url,
307 handle: Handle,
308}
309
310impl Client {
311 pub fn new(base_uri: &str, handle: &Handle) -> Result<Client, Error> {
313 let base_uri = Url::parse(base_uri)?;
314 let threads = 4;
315
316 let mut http = HttpConnector::new(threads);
317 http.enforce_http(false);
318 http.set_reactor(Some(handle.clone()));
319 let tls = TlsConnector::builder().build()?;
320
321 let connector = HttpsConnector::from((http, tls));
322
323 let http_client = HttpClient::builder()
324 .keep_alive(true)
325 .build(connector);
326
327 Ok(Client{
328 http_client,
329 base_uri,
330 handle: handle.clone(),
331 })
332 }
333
334 pub fn services(&self) -> Watcher<Services> {
336 let mut base_uri = self.base_uri.clone();
337 base_uri.set_path("/v1/catalog/services");
338
339 Watcher{
340 state: WatcherState::Init{
341 base_uri,
342 client: self.clone(),
343 error_strategy: ErrorStrategy::default(),
344 },
345 phantom: PhantomData::<Services>
346 }
347 }
348
349 pub fn watch_service(&self, name: &str, passing: bool) -> Watcher<ServiceNodes> {
351 let mut base_uri = self.base_uri.clone();
352 base_uri.set_path("/v1/health/service/");
353 let mut base_uri = base_uri.join(name).unwrap();
354 if passing {
355 base_uri.query_pairs_mut()
356 .append_pair("passing", "true")
357 .finish();
358 }
359
360 Watcher{
361 state: WatcherState::Init{
362 base_uri,
363 client: self.clone(),
364 error_strategy: ErrorStrategy::default(),
365 },
366 phantom: PhantomData::<ServiceNodes>
367 }
368 }
369
370 pub fn agent(&self) -> FutureConsul<Agent> {
372 let mut base_uri = self.base_uri.clone();
373 base_uri.set_path("/v1/agent/self");
374
375 FutureConsul{
376 state: FutureState::Init{
377 base_uri,
378 client: self.clone(),
379 },
380 phantom: PhantomData::<Agent>
381 }
382 }
383}
384
385#[derive(Debug)]
386enum ErrorHandling {
387 RetryBackoff,
388}
389
390#[derive(Debug)]
391struct ErrorStrategy {
392 request_timeout: Duration,
393 on_error: ErrorHandling,
394}
395
396impl Default for ErrorStrategy {
397 fn default() -> ErrorStrategy {
398 ErrorStrategy {
399 request_timeout: Duration::new(5, 0),
400 on_error: ErrorHandling::RetryBackoff,
401 }
402 }
403}
404
405#[derive(Debug)]
406struct ErrorState{
407 strategy: ErrorStrategy,
408 current_retries: u64,
409 last_try: Option<Instant>,
410 last_contact: Option<Instant>,
411 last_ok: Option<Instant>,
412 last_error: Option<ProtocolError>,
413}
414
415impl ErrorState {
416 pub fn next_timeout(&self) -> DebugTimeout {
417 let retries = if self.current_retries > 10 {
418 10
419 } else {
420 self.current_retries
421 };
422
423 debug!("Will sleep for {} seconds and retry", retries);
424 let duration = Duration::new(retries, 0);
425
426 DebugTimeout::new(duration)
427 }
428}
429
430impl From<ErrorStrategy> for ErrorState {
431 fn from(strategy: ErrorStrategy) -> ErrorState {
432 ErrorState {
433 strategy,
434 current_retries: 0,
435 last_try: None,
436 last_contact: None,
437 last_ok: None,
438 last_error: None,
439 }
440 }
441}
442
443struct DebugTimeout(Timeout<Empty<(), io::Error>>);
444
445impl DebugTimeout {
446 pub fn new(duration: Duration) -> Self {
447 DebugTimeout(Timeout::new(empty_future(), duration))
448 }
449}
450
451impl fmt::Debug for DebugTimeout {
452 fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
453 write!(f, "Timeout")
454 }
455}
456
457impl Future for DebugTimeout {
458 type Item = ();
459 type Error = io::Error;
460
461 #[inline]
462 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
463 trace!("Timeout::poll() called");
464 let res = self.0.poll();
465
466 trace!("res {:?}", res);
467 match res {
468 Err(err) => match err.into_inner() {
469 Some(err) => Err(err),
470 None => Ok(Async::Ready(())),
471 },
472 Ok(ready) => Ok(ready),
473 }
474 }
475}
476
477fn url_to_uri(uri: &Url) -> Uri {
478 let out = Uri::from_str(uri.as_str());
479 if out.is_err() {
480 error!("url malformed: {:?}", uri);
481 }
482
483 out.unwrap()
485}
486
487#[derive(Debug)]
488enum WatcherState{
489 Init{
490 base_uri: Url,
491 client: Client,
492 error_strategy: ErrorStrategy,
493 },
494 Completed {
495 base_uri: Url,
496 client: Client,
497 error_state: ErrorState,
498 blocking: Blocking,
499 },
500 Error {
501 base_uri: Url,
502 client: Client,
503 blocking: Blocking,
504 error_state: ErrorState,
505 retry: Option<DebugTimeout>,
506 },
507 PendingHeaders {
508 base_uri: Url,
509 client: Client,
510 error_state: ErrorState,
511 request: ResponseFuture,
512 blocking: Blocking,
513 },
514 PendingBody {
515 base_uri: Url,
516 client: Client,
517 error_state: ErrorState,
518 blocking: Blocking,
519 headers: Headers,
520 body: BodyBuffer,
521 },
522 Working,
523}
524
525impl Stream for WatcherState {
526 type Item = Result<Chunk, ProtocolError>;
527 type Error = Error;
528
529 fn poll(&mut self) -> Poll<Option<Self::Item>, <WatcherState as Stream>::Error> {
530 trace!("polling WatcherState");
531 loop {
532 match mem::replace(self, WatcherState::Working) {
533 WatcherState::Init{base_uri, client, error_strategy} => {
534 trace!("querying uri: {}", base_uri);
535
536 let request = client.http_client.get(url_to_uri(&base_uri));
537 trace!("{}: no response for now => PendingHeader", base_uri);
538 *self = WatcherState::PendingHeaders {
539 base_uri,
540 client,
541 request,
542 error_state: error_strategy.into(),
543 blocking: Blocking::default(),
544 };
545 },
546 WatcherState::Completed{base_uri, client, blocking, mut error_state} => {
547 let uri = blocking.add_to_uri(&base_uri);
548 trace!("querying uri: {}", uri);
549
550 error_state.last_try = Some(Instant::now());
551
552 let request = client.http_client.get(url_to_uri(&uri));
553 trace!("{}: no response for now => PendingHeader", base_uri);
554 *self = WatcherState::PendingHeaders {
555 base_uri,
556 client,
557 request,
558 blocking,
559 error_state,
560 };
561 },
562 WatcherState::PendingHeaders{base_uri, client, blocking, mut request, mut error_state} => {
563 trace!("{}: polling headers", base_uri);
564
565 match request.poll() {
566 Err(e) => {
567 if e.is_connect() {
568 warn!("{}: got io error: {}", base_uri, e);
569 let err = ProtocolError::ConnectionRefused;
570 error_state.last_error = Some(err);
571 error_state.current_retries += 1;
572 *self = WatcherState::Error{
573 base_uri,
574 client,
575 blocking,
576 error_state,
577 retry: None,
578 };
579 return Ok(Async::Ready(Some(Err(err))));
580 } else {
581 error!("{}: got error, stopping: {}", base_uri, e);
582 return Err(e.into());
583 }
584 },
585 Ok(Async::Ready(response_headers)) => {
586 let status = response_headers.status();
587 let headers = response_headers.headers().clone();
588 let response_has_json_content_type = headers.get(CONTENT_TYPE).map(|h| h.eq("application/json")).unwrap_or(false);
589 error_state.last_contact = Some(Instant::now());
590
591 if status != StatusCode::OK {
592 warn!("{}: got non-200 status: {}", base_uri, status);
593 let err = ProtocolError::NonOkResult(status);
594 error_state.last_error = Some(err);
595 error_state.current_retries += 1;
596 *self = WatcherState::Error{
597 base_uri,
598 client,
599 blocking,
600 error_state,
601 retry: None,
602 };
603 return Ok(Async::Ready(Some(Err(err))))
604 }
605
606
607 if !response_has_json_content_type {
608 warn!("{}: got non-json content: {:?}", base_uri, headers);
609 error_state.last_error = Some(ProtocolError::ContentTypeNotJson);
610 *self = WatcherState::Error{
611 base_uri,
612 client,
613 blocking,
614 error_state,
615 retry: None,
616 };
617 } else {
618
619 trace!("{}: got headers {} {:?} => PendingBody", base_uri, status, headers);
620 let body = BodyBuffer::new(response_headers.into_body());
621
622 *self = WatcherState::PendingBody {
623 base_uri,
624 client,
625 blocking,
626 headers,
627 body,
628 error_state,
629 };
630 };
631 },
632 Ok(Async::NotReady) => {
633 trace!("{}: still no headers => PendingHeaders", base_uri);
634 *self = WatcherState::PendingHeaders {
635 base_uri,
636 client,
637 blocking,
638 request,
639 error_state,
640 };
641 return Ok(Async::NotReady);
642 }
643 }
644 },
645 WatcherState::PendingBody{base_uri, client, blocking, headers, mut body, mut error_state} => {
646 trace!("{}: polling body", base_uri);
647
648 if let Async::Ready(body) = body.poll()? {
649 debug!("{}: got content: {:?}", base_uri, body);
650 let new_blocking = Blocking::from(&headers).map_err(|_| ProtocolError::BlockingMissing);
651 match new_blocking {
652 Err(err) => {
653 error!("{}: got error while parsing blocking headers: {:?}, {:?}", base_uri, headers, err);
654 error_state.last_error = Some(err);
655 error_state.current_retries += 1;
656 *self = WatcherState::Error{
657 base_uri,
658 client,
659 error_state,
660 blocking,
661
662 retry: None,
666 };
667 return Ok(Async::Ready(Some(Err(err))));
668 },
669 Ok(blocking) => {
670 info!("{}: got blocking headers: {}", base_uri, blocking);
671 error_state.last_ok = Some(Instant::now());
672 error_state.last_error = None;
673 error_state.current_retries = 0;
674
675 *self = WatcherState::Completed{
676 base_uri,
677 client,
678 blocking,
679 error_state
680 };
681
682 return Ok(Async::Ready(Some(Ok(body))));
683 }
684 }
685 } else {
686 trace!("{}: still no body => PendingBody", base_uri);
687
688 *self = WatcherState::PendingBody {
689 base_uri,
690 client,
691 headers,
692 blocking,
693 body,
694 error_state,
695 };
696 return Ok(Async::NotReady);
697 }
698 },
699
700 WatcherState::Error{base_uri, client, blocking, error_state, retry} => {
701 trace!("{}: still no body => PendingBody", base_uri);
702 if let Some(mut retry) = retry {
703 if let Async::Ready(_) = retry.poll()? {
705 trace!("{}: timeout completed", base_uri);
706 *self = WatcherState::Completed{
707 base_uri,
708 client,
709 blocking,
710 error_state
711 };
712 } else {
713 trace!("{}: timeout not completed", base_uri);
714 *self = WatcherState::Error{
715 base_uri,
716 client,
717 blocking,
718 error_state,
719 retry: Some(retry)
720 };
721 return Ok(Async::NotReady);
722 }
723 } else {
724 let next_timeout = error_state.next_timeout();
725 trace!("{}: setting timeout", base_uri);
726 *self = WatcherState::Error{
727 base_uri,
728 client,
729 blocking,
730 error_state,
731 retry: Some(next_timeout),
732 };
733 }
735 }
736
737 WatcherState::Working => {
739 error!("watcher in working state, weird");
740 return Err(Error::InvalidState);
741 },
742 }
743 }
744 }
745}
746
747#[derive(Debug)]
749pub struct Watcher<T>{
750 state: WatcherState,
751 phantom: PhantomData<T>,
752}
753
754impl<T> Watcher<T> {
755 pub fn reset(&mut self) {
760 let (base_uri, client, blocking, mut error_state) = match mem::replace(&mut self.state, WatcherState::Working) {
761 WatcherState::Init{base_uri, client, error_strategy, ..} =>
762 (base_uri, client, Blocking::default(), ErrorState::from(error_strategy)),
763 WatcherState::Completed{base_uri, client, blocking, error_state, ..} |
764 WatcherState::Error{base_uri, client, blocking, error_state, ..} |
765 WatcherState::PendingHeaders{base_uri, client, blocking, error_state, ..} |
766 WatcherState::PendingBody{base_uri, client, blocking, error_state, ..} =>
767 (base_uri, client, blocking, error_state),
768 WatcherState::Working => panic!("stream resetted while polled. State is invalid"),
769 };
770 error_state.last_error = Some(ProtocolError::StreamRestarted);
771 self.state = WatcherState::Error{
772 base_uri,
773 client,
774 blocking,
775 error_state,
776 retry: None,
777 };
778 }
779}
780
781impl<T> Stream for Watcher<T>
782 where T: ConsulType {
783 type Item = Result<T::Reply, ParseError>;
784 type Error = Error;
785
786 #[inline]
787 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
788 match self.state.poll()? {
790 Async::NotReady => Ok(Async::NotReady),
791 Async::Ready(None) => Ok(Async::Ready(None)),
792 Async::Ready(Some(Err(e))) => Ok(Async::Ready(Some(Err(e.into())))),
793 Async::Ready(Some(Ok(body))) => {
794 Ok(Async::Ready(Some(T::parse(&body))))
795 }
796 }
797 }
798}
799
800
801pub trait ConsulType {
803 type Reply;
805
806 fn parse(buf: &Chunk) -> Result<Self::Reply, ParseError>;
808}
809
810fn read_map_service_name(value: &JsonValue) -> Result<Vec<ServiceName>, ParseError> {
811 if let &JsonValue::Object(ref map) = value {
812 let mut out = Vec::with_capacity(map.len());
813 for (k, v) in map.iter() {
814 if let &JsonValue::Array(ref _values) = v {
815 if k != "consul" {
816 out.push(k.clone());
817 }
818 } else {
819 return Err(ParseError::UnexpectedJsonFormat)
820 }
821 }
822 Ok(out)
823 } else {
824 Err(ParseError::UnexpectedJsonFormat)
825 }
826}
827
828pub type ServiceName = String;
830
831#[derive(Debug)]
833pub struct Services {}
834impl ConsulType for Services {
835 type Reply = Vec<ServiceName>;
836
837 fn parse(buf: &Chunk) -> Result<Self::Reply, ParseError> {
838 let v: JsonValue = from_slice(&buf).map_err(ParseError::BodyParsing)?;
839 let res = read_map_service_name(&v)?;
840
841 Ok(res)
842 }
843}
844
845#[derive(Debug)]
847pub struct ServiceNodes {}
848impl ConsulType for ServiceNodes {
849 type Reply = Vec<Node>;
850
851 fn parse(buf: &Chunk) -> Result<Self::Reply, ParseError> {
852 let v: Vec<TempNode> = from_slice(&buf).map_err(ParseError::BodyParsing)?;
853
854 Ok(v.into_iter().map(|x| x.node).collect())
855 }
856}
857
858#[derive(Deserialize)]
859struct TempNode {
860 #[serde(rename = "Node")]
861 node: Node,
862}
863
864#[derive(Debug, Deserialize, PartialEq, Clone)]
866pub struct Node {
867 #[serde(rename = "Node")]
869 pub name: String,
870
871 #[serde(rename = "Address")]
873 pub address: IpAddr,
874}
875
876#[derive(Debug)]
878pub struct FutureConsul<T> {
879 state: FutureState,
880 phantom: PhantomData<T>,
881}
882
883impl<T> Future for FutureConsul<T>
884 where T: ConsulType {
885 type Item = T::Reply;
886 type Error = Error;
887
888 #[inline]
889 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
890 match self.state.poll()? {
892 Async::NotReady => Ok(Async::NotReady),
893 Async::Ready(body) => {
894 T::parse(&body).map(|res| {
895 Async::Ready(res)
896 }).map_err(|e| Error::BodyParse(e))
897 },
898 }
899 }
900}
901
902#[derive(Debug)]
903enum FutureState {
904 Init {
905 base_uri: Url,
906 client: Client,
907 },
908 PendingHeaders {
909 base_uri: Url,
910 client: Client,
911 request: ResponseFuture,
912 },
913 PendingBody {
914 base_uri: Url,
915 client: Client,
916 headers: Headers,
917 body: BodyBuffer,
918 },
919 Done,
920 Working,
921}
922
923impl Future for FutureState {
924 type Item = Chunk;
925 type Error = Error;
926
927 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
928 trace!("polling FutureState");
929 loop {
930 match mem::replace(self, FutureState::Working) {
931 FutureState::Init{base_uri, client} => {
932 trace!("querying uri: {}", base_uri);
933
934 let request = client.http_client.get(url_to_uri(&base_uri));
935 trace!("no response for now => PendingHeader");
936 *self = FutureState::PendingHeaders {
937 base_uri,
938 client,
939 request,
940 };
941 },
942 FutureState::PendingHeaders{base_uri, client, mut request} => {
943 trace!("polling headers");
944
945 match request.poll()? {
946 Async::Ready(response_headers) => {
947 let status = response_headers.status();
948 let headers = response_headers.headers().clone();
949 let response_has_json_content_type = headers.get(CONTENT_TYPE).map(|h| h.eq("application/json")).unwrap_or(false);
950 if status != StatusCode::OK {
951 let err = ProtocolError::NonOkResult(status);
952 return Err(Error::BodyParse(ParseError::Protocol(err)))
953 } else if !response_has_json_content_type {
954 let err = ProtocolError::ContentTypeNotJson;
955 return Err(Error::BodyParse(ParseError::Protocol(err)))
956 } else {
957 trace!("got headers {} {:?} => PendingBody", status, headers);
958 let body = BodyBuffer::new(response_headers.into_body());
959 *self = FutureState::PendingBody {
960 base_uri,
961 client,
962 headers,
963 body,
964 };
965 }
966 },
967 Async::NotReady => {
968 trace!("still no headers => PendingHeaders");
969 *self = FutureState::PendingHeaders {
970 base_uri,
971 client,
972 request,
973 };
974 return Ok(Async::NotReady);
975 },
976 }
977 },
978 FutureState::PendingBody{base_uri, client, headers, mut body} => {
979 trace!("polling body");
980
981 if let Async::Ready(body) = body.poll()? {
982 *self = FutureState::Done;
983 return Ok(Async::Ready(body));
984 } else {
985 *self = FutureState::PendingBody{
986 base_uri,
987 client,
988 headers,
989 body
990 };
991 return Ok(Async::NotReady);
992 }
993 }
994
995 FutureState::Working | FutureState::Done => {
997 return Err(Error::InvalidState);
998 },
999 }
1000 }
1001 }
1002}
1003
1004#[derive(Deserialize)]
1005struct InnerAgent {
1006 #[serde(rename = "Member")]
1007 member: InnerMember,
1008}
1009
1010#[derive(Deserialize)]
1011struct InnerMember {
1012 #[serde(rename = "Addr")]
1013 addr: IpAddr,
1014}
1015
1016#[derive(Debug)]
1018pub struct Agent {
1019 pub member_address: IpAddr,
1021}
1022
1023impl ConsulType for Agent {
1024 type Reply = Agent;
1025
1026 fn parse(buf: &Chunk) -> Result<Self::Reply, ParseError> {
1027 let agent: InnerAgent = serde_json::from_slice(&buf).map_err(ParseError::BodyParsing)?;
1028 Ok(Agent {
1029 member_address: agent.member.addr,
1030 })
1031 }
1032}
1033