1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
use std::time::SystemTime;

use failure::{Fail};
use futures::{Async, Sink, AsyncSink};
use futures::future::{FutureResult, ok};
use futures::sync::oneshot::{channel, Sender};
use serde_json::from_slice;
use tk_http::client as http;
use tk_http::{Status, Version};

use {Connection, ResponseFuture};
use errors::BadResponse;
use response;


/// Info about the peer
///
/// We currently include only a subset of data reported by cantal here.
/// Mostly things that are unlikely to change in future. This will be fixed
/// when cantal grows stable API.
#[derive(Debug, Serialize, Deserialize)]
pub struct Peer {
    /// Host identifier (machine-id)
    pub id: String,
    /// Hostname of the host
    pub hostname: String,
    /// Name of the host, usually FQDN
    pub name: String,
    /// Primary IP address (which works of pings, etc)
    pub primary_addr: Option<String>,
    /// The list of all IP addresses of the host
    pub addresses: Vec<String>,

    /// Time when peer became known to this host
    #[serde(with="::serde_millis")]
    pub known_since: SystemTime,

    /// Time of last report across the network
    #[serde(with="::serde_millis", default)]
    pub last_report: Option<SystemTime>,

    /// Last time probe (ping) sent
    ///
    /// This is useful to check if last_report is too outdated
    #[serde(with="::serde_millis", default)]
    pub probe_time: Option<SystemTime>,

    /// Last report directly to this host
    #[serde(with="::serde_millis", default)]
    pub last_report_direct: Option<SystemTime>,

    #[serde(skip)]
    _non_exhaustive: (),
}

/// A response to the `get_peers()` request
#[derive(Debug)]
pub struct PeersResponse {
    /// A timestamp when `get_peers()` wasa issued
    pub requested: SystemTime,
    /// A timestamp when response to the request was received
    pub received: SystemTime,
    /// Actual list of peer data
    pub peers: Vec<Peer>,
}


struct PeersCodec {
    request_time: SystemTime,
    tx: Option<Sender<PeersResponse>>,
}

impl Connection {
    /// Start a request that returns a list of peers known to cantal
    pub fn get_peers(&self) -> ResponseFuture<PeersResponse> {
        let (tx, rx) = channel();
        let pcodec = PeersCodec {
            request_time: SystemTime::now(),
            tx: Some(tx),
        };
        match self.pool.clone().start_send(Box::new(pcodec)) {
            Ok(AsyncSink::NotReady(_)) => response::not_connected(),
            Ok(AsyncSink::Ready) => response::from_channel(rx),
            Err(_send_error) => response::not_connected(),
        }
    }
}

impl<S> http::Codec<S> for PeersCodec {
    type Future = FutureResult<http::EncoderDone<S>, http::Error>;

    fn start_write(&mut self, mut e: http::Encoder<S>) -> Self::Future {
        e.request_line("GET", "/all_peers.json", Version::Http11);
        e.done_headers().unwrap();
        ok(e.done())
    }
    fn headers_received(&mut self, headers: &http::Head)
        -> Result<http::RecvMode, http::Error>
    {
        if headers.status() != Some(Status::Ok) {
            return Err(http::Error::custom(
                BadResponse::Status(headers.status()).compat()));
        }
        Ok(http::RecvMode::buffered(10_485_760))
    }
    fn data_received(&mut self, data: &[u8], end: bool)
        -> Result<Async<usize>, http::Error>
    {
        #[derive(Debug, Deserialize, Serialize)]
        pub struct Response {
            peers: Vec<Peer>,
        }

        debug_assert!(end);
        let decoded: Response = match from_slice(data) {
            Ok(data) => data,
            Err(e) => {
                error!("Error decoding peers data: {}", e);
                drop(self.tx.take().expect("sender is still alive"));
                return Ok(Async::Ready(data.len()));
            }
        };
        self.tx.take().expect("sender is still alive").send(PeersResponse {
            requested: self.request_time,
            received: SystemTime::now(),
            peers: decoded.peers,
        }).map_err(|_| {
            debug!("Can't send response for peers request, oneshot is closed");
        }).ok();
        return Ok(Async::Ready(data.len()));
    }
}