1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
use std::time::SystemTime;
use failure::{Fail};
use futures::{Async, Sink, AsyncSink};
use futures::future::{FutureResult, ok};
use futures::sync::oneshot::{channel, Sender};
use serde_json::from_slice;
use tk_http::client as http;
use tk_http::{Status, Version};
use {Connection, ResponseFuture};
use errors::BadResponse;
use response;
#[derive(Debug, Serialize, Deserialize)]
pub struct Peer {
pub id: String,
pub hostname: String,
pub name: String,
pub primary_addr: Option<String>,
pub addresses: Vec<String>,
#[serde(with="::serde_millis")]
pub known_since: SystemTime,
#[serde(with="::serde_millis", default)]
pub last_report: Option<SystemTime>,
#[serde(with="::serde_millis", default)]
pub probe_time: Option<SystemTime>,
#[serde(with="::serde_millis", default)]
pub last_report_direct: Option<SystemTime>,
#[serde(skip)]
_non_exhaustive: (),
}
#[derive(Debug)]
pub struct PeersResponse {
pub requested: SystemTime,
pub received: SystemTime,
pub peers: Vec<Peer>,
}
struct PeersCodec {
request_time: SystemTime,
tx: Option<Sender<PeersResponse>>,
}
impl Connection {
pub fn get_peers(&self) -> ResponseFuture<PeersResponse> {
let (tx, rx) = channel();
let pcodec = PeersCodec {
request_time: SystemTime::now(),
tx: Some(tx),
};
match self.pool.clone().start_send(Box::new(pcodec)) {
Ok(AsyncSink::NotReady(_)) => response::not_connected(),
Ok(AsyncSink::Ready) => response::from_channel(rx),
Err(_send_error) => response::not_connected(),
}
}
}
impl<S> http::Codec<S> for PeersCodec {
type Future = FutureResult<http::EncoderDone<S>, http::Error>;
fn start_write(&mut self, mut e: http::Encoder<S>) -> Self::Future {
e.request_line("GET", "/all_peers.json", Version::Http11);
e.done_headers().unwrap();
ok(e.done())
}
fn headers_received(&mut self, headers: &http::Head)
-> Result<http::RecvMode, http::Error>
{
if headers.status() != Some(Status::Ok) {
return Err(http::Error::custom(
BadResponse::Status(headers.status()).compat()));
}
Ok(http::RecvMode::buffered(10_485_760))
}
fn data_received(&mut self, data: &[u8], end: bool)
-> Result<Async<usize>, http::Error>
{
#[derive(Debug, Deserialize, Serialize)]
pub struct Response {
peers: Vec<Peer>,
}
debug_assert!(end);
let decoded: Response = match from_slice(data) {
Ok(data) => data,
Err(e) => {
error!("Error decoding peers data: {}", e);
drop(self.tx.take().expect("sender is still alive"));
return Ok(Async::Ready(data.len()));
}
};
self.tx.take().expect("sender is still alive").send(PeersResponse {
requested: self.request_time,
received: SystemTime::now(),
peers: decoded.peers,
}).map_err(|_| {
debug!("Can't send response for peers request, oneshot is closed");
}).ok();
return Ok(Async::Ready(data.len()));
}
}