tk_cantal/
peers.rs

1use std::time::SystemTime;
2
3use failure::{Fail};
4use futures::{Async, Sink, AsyncSink};
5use futures::future::{FutureResult, ok};
6use futures::sync::oneshot::{channel, Sender};
7use serde_json::from_slice;
8use tk_http::client as http;
9use tk_http::{Status, Version};
10
11use {Connection, ResponseFuture};
12use errors::BadResponse;
13use response;
14
15
16/// Info about the peer
17///
18/// We currently include only a subset of data reported by cantal here.
19/// Mostly things that are unlikely to change in future. This will be fixed
20/// when cantal grows stable API.
21#[derive(Debug, Serialize, Deserialize)]
22pub struct Peer {
23    /// Host identifier (machine-id)
24    pub id: String,
25    /// Hostname of the host
26    pub hostname: String,
27    /// Name of the host, usually FQDN
28    pub name: String,
29    /// Primary IP address (which works of pings, etc)
30    pub primary_addr: Option<String>,
31    /// The list of all IP addresses of the host
32    pub addresses: Vec<String>,
33
34    /// Time when peer became known to this host
35    #[serde(with="::serde_millis")]
36    pub known_since: SystemTime,
37
38    /// Time of last report across the network
39    #[serde(with="::serde_millis", default)]
40    pub last_report: Option<SystemTime>,
41
42    /// Last time probe (ping) sent
43    ///
44    /// This is useful to check if last_report is too outdated
45    #[serde(with="::serde_millis", default)]
46    pub probe_time: Option<SystemTime>,
47
48    /// Last report directly to this host
49    #[serde(with="::serde_millis", default)]
50    pub last_report_direct: Option<SystemTime>,
51
52    #[serde(skip)]
53    _non_exhaustive: (),
54}
55
56/// A response to the `get_peers()` request
57#[derive(Debug)]
58pub struct PeersResponse {
59    /// A timestamp when `get_peers()` wasa issued
60    pub requested: SystemTime,
61    /// A timestamp when response to the request was received
62    pub received: SystemTime,
63    /// Actual list of peer data
64    pub peers: Vec<Peer>,
65}
66
67
68struct PeersCodec {
69    request_time: SystemTime,
70    tx: Option<Sender<PeersResponse>>,
71}
72
73impl Connection {
74    /// Start a request that returns a list of peers known to cantal
75    pub fn get_peers(&self) -> ResponseFuture<PeersResponse> {
76        let (tx, rx) = channel();
77        let pcodec = PeersCodec {
78            request_time: SystemTime::now(),
79            tx: Some(tx),
80        };
81        match self.pool.clone().start_send(Box::new(pcodec)) {
82            Ok(AsyncSink::NotReady(_)) => response::not_connected(),
83            Ok(AsyncSink::Ready) => response::from_channel(rx),
84            Err(_send_error) => response::not_connected(),
85        }
86    }
87}
88
89impl<S> http::Codec<S> for PeersCodec {
90    type Future = FutureResult<http::EncoderDone<S>, http::Error>;
91
92    fn start_write(&mut self, mut e: http::Encoder<S>) -> Self::Future {
93        e.request_line("GET", "/all_peers.json", Version::Http11);
94        e.done_headers().unwrap();
95        ok(e.done())
96    }
97    fn headers_received(&mut self, headers: &http::Head)
98        -> Result<http::RecvMode, http::Error>
99    {
100        if headers.status() != Some(Status::Ok) {
101            return Err(http::Error::custom(
102                BadResponse::Status(headers.status()).compat()));
103        }
104        Ok(http::RecvMode::buffered(10_485_760))
105    }
106    fn data_received(&mut self, data: &[u8], end: bool)
107        -> Result<Async<usize>, http::Error>
108    {
109        #[derive(Debug, Deserialize, Serialize)]
110        pub struct Response {
111            peers: Vec<Peer>,
112        }
113
114        debug_assert!(end);
115        let decoded: Response = match from_slice(data) {
116            Ok(data) => data,
117            Err(e) => {
118                error!("Error decoding peers data: {}", e);
119                drop(self.tx.take().expect("sender is still alive"));
120                return Ok(Async::Ready(data.len()));
121            }
122        };
123        self.tx.take().expect("sender is still alive").send(PeersResponse {
124            requested: self.request_time,
125            received: SystemTime::now(),
126            peers: decoded.peers,
127        }).map_err(|_| {
128            debug!("Can't send response for peers request, oneshot is closed");
129        }).ok();
130        return Ok(Async::Ready(data.len()));
131    }
132}