use std::time::SystemTime;
use failure::{Fail};
use futures::{Async, Sink, AsyncSink};
use futures::future::{FutureResult, ok};
use futures::sync::oneshot::{channel, Sender};
use serde_json::from_slice;
use tk_http::client as http;
use tk_http::{Status, Version};
use {Connection, ResponseFuture};
use errors::BadResponse;
use response;
#[derive(Debug, Serialize, Deserialize)]
pub struct Peer {
pub id: String,
pub hostname: String,
pub name: String,
pub primary_addr: Option<String>,
pub addresses: Vec<String>,
#[serde(with="::serde_millis")]
pub known_since: SystemTime,
#[serde(with="::serde_millis", default)]
pub last_report: Option<SystemTime>,
#[serde(with="::serde_millis", default)]
pub probe_time: Option<SystemTime>,
#[serde(with="::serde_millis", default)]
pub last_report_direct: Option<SystemTime>,
#[serde(skip)]
_non_exhaustive: (),
}
#[derive(Debug)]
pub struct PeersResponse {
pub requested: SystemTime,
pub received: SystemTime,
pub peers: Vec<Peer>,
}
struct PeersCodec {
request_time: SystemTime,
tx: Option<Sender<PeersResponse>>,
}
impl Connection {
pub fn get_peers(&self) -> ResponseFuture<PeersResponse> {
let (tx, rx) = channel();
let pcodec = PeersCodec {
request_time: SystemTime::now(),
tx: Some(tx),
};
match self.pool.clone().start_send(Box::new(pcodec)) {
Ok(AsyncSink::NotReady(_)) => response::not_connected(),
Ok(AsyncSink::Ready) => response::from_channel(rx),
Err(_send_error) => response::not_connected(),
}
}
}
impl<S> http::Codec<S> for PeersCodec {
type Future = FutureResult<http::EncoderDone<S>, http::Error>;
fn start_write(&mut self, mut e: http::Encoder<S>) -> Self::Future {
e.request_line("GET", "/all_peers.json", Version::Http11);
e.done_headers().unwrap();
ok(e.done())
}
fn headers_received(&mut self, headers: &http::Head)
-> Result<http::RecvMode, http::Error>
{
if headers.status() != Some(Status::Ok) {
return Err(http::Error::custom(
BadResponse::Status(headers.status()).compat()));
}
Ok(http::RecvMode::buffered(10_485_760))
}
fn data_received(&mut self, data: &[u8], end: bool)
-> Result<Async<usize>, http::Error>
{
#[derive(Debug, Deserialize, Serialize)]
pub struct Response {
peers: Vec<Peer>,
}
debug_assert!(end);
let decoded: Response = match from_slice(data) {
Ok(data) => data,
Err(e) => {
error!("Error decoding peers data: {}", e);
drop(self.tx.take().expect("sender is still alive"));
return Ok(Async::Ready(data.len()));
}
};
self.tx.take().expect("sender is still alive").send(PeersResponse {
requested: self.request_time,
received: SystemTime::now(),
peers: decoded.peers,
}).map_err(|_| {
debug!("Can't send response for peers request, oneshot is closed");
}).ok();
return Ok(Async::Ready(data.len()));
}
}