1use std::time::SystemTime;
2
3use failure::{Fail};
4use futures::{Async, Sink, AsyncSink};
5use futures::future::{FutureResult, ok};
6use futures::sync::oneshot::{channel, Sender};
7use serde_json::from_slice;
8use tk_http::client as http;
9use tk_http::{Status, Version};
10
11use {Connection, ResponseFuture};
12use errors::BadResponse;
13use response;
14
15
16#[derive(Debug, Serialize, Deserialize)]
22pub struct Peer {
23 pub id: String,
25 pub hostname: String,
27 pub name: String,
29 pub primary_addr: Option<String>,
31 pub addresses: Vec<String>,
33
34 #[serde(with="::serde_millis")]
36 pub known_since: SystemTime,
37
38 #[serde(with="::serde_millis", default)]
40 pub last_report: Option<SystemTime>,
41
42 #[serde(with="::serde_millis", default)]
46 pub probe_time: Option<SystemTime>,
47
48 #[serde(with="::serde_millis", default)]
50 pub last_report_direct: Option<SystemTime>,
51
52 #[serde(skip)]
53 _non_exhaustive: (),
54}
55
56#[derive(Debug)]
58pub struct PeersResponse {
59 pub requested: SystemTime,
61 pub received: SystemTime,
63 pub peers: Vec<Peer>,
65}
66
67
68struct PeersCodec {
69 request_time: SystemTime,
70 tx: Option<Sender<PeersResponse>>,
71}
72
73impl Connection {
74 pub fn get_peers(&self) -> ResponseFuture<PeersResponse> {
76 let (tx, rx) = channel();
77 let pcodec = PeersCodec {
78 request_time: SystemTime::now(),
79 tx: Some(tx),
80 };
81 match self.pool.clone().start_send(Box::new(pcodec)) {
82 Ok(AsyncSink::NotReady(_)) => response::not_connected(),
83 Ok(AsyncSink::Ready) => response::from_channel(rx),
84 Err(_send_error) => response::not_connected(),
85 }
86 }
87}
88
89impl<S> http::Codec<S> for PeersCodec {
90 type Future = FutureResult<http::EncoderDone<S>, http::Error>;
91
92 fn start_write(&mut self, mut e: http::Encoder<S>) -> Self::Future {
93 e.request_line("GET", "/all_peers.json", Version::Http11);
94 e.done_headers().unwrap();
95 ok(e.done())
96 }
97 fn headers_received(&mut self, headers: &http::Head)
98 -> Result<http::RecvMode, http::Error>
99 {
100 if headers.status() != Some(Status::Ok) {
101 return Err(http::Error::custom(
102 BadResponse::Status(headers.status()).compat()));
103 }
104 Ok(http::RecvMode::buffered(10_485_760))
105 }
106 fn data_received(&mut self, data: &[u8], end: bool)
107 -> Result<Async<usize>, http::Error>
108 {
109 #[derive(Debug, Deserialize, Serialize)]
110 pub struct Response {
111 peers: Vec<Peer>,
112 }
113
114 debug_assert!(end);
115 let decoded: Response = match from_slice(data) {
116 Ok(data) => data,
117 Err(e) => {
118 error!("Error decoding peers data: {}", e);
119 drop(self.tx.take().expect("sender is still alive"));
120 return Ok(Async::Ready(data.len()));
121 }
122 };
123 self.tx.take().expect("sender is still alive").send(PeersResponse {
124 requested: self.request_time,
125 received: SystemTime::now(),
126 peers: decoded.peers,
127 }).map_err(|_| {
128 debug!("Can't send response for peers request, oneshot is closed");
129 }).ok();
130 return Ok(Async::Ready(data.len()));
131 }
132}