use error::Error::*;
use error::{self, TwitterErrors};
use futures::{Async, Future, Poll, Stream};
use hyper::client::ResponseFuture;
use hyper::header::CONTENT_LENGTH;
use hyper::{self, Body, Request, StatusCode};
#[cfg(feature = "hyper-rustls")]
use hyper_rustls::HttpsConnector;
#[cfg(feature = "native-tls")]
use hyper_tls::HttpsConnector;
use serde;
use serde_json;
use std::iter::FromIterator;
use std::ops::{Deref, DerefMut};
use std::{io, mem, slice, vec};
use super::Headers;
const X_RATE_LIMIT_LIMIT: &'static str = "X-Rate-Limit-Limit";
const X_RATE_LIMIT_REMAINING: &'static str = "X-Rate-Limit-Remaining";
const X_RATE_LIMIT_RESET: &'static str = "X-Rate-Limit-Reset";
fn rate_limit(headers: &Headers, header: &'static str) -> Result<Option<i32>, error::Error> {
let val = headers.get(header);
if let Some(val) = val {
let val = val.to_str()?.parse::<i32>()?;
Ok(Some(val))
} else {
Ok(None)
}
}
fn rate_limit_limit(headers: &Headers) -> Result<Option<i32>, error::Error> {
rate_limit(headers, X_RATE_LIMIT_LIMIT)
}
fn rate_limit_remaining(headers: &Headers) -> Result<Option<i32>, error::Error> {
rate_limit(headers, X_RATE_LIMIT_REMAINING)
}
fn rate_limit_reset(headers: &Headers) -> Result<Option<i32>, error::Error> {
rate_limit(headers, X_RATE_LIMIT_RESET)
}
#[derive(Debug, Deserialize)]
pub struct Response<T> {
#[serde(rename = "limit")]
pub rate_limit: i32,
#[serde(rename = "remaining")]
pub rate_limit_remaining: i32,
#[serde(rename = "reset")]
pub rate_limit_reset: i32,
#[serde(default)]
pub response: T,
}
impl<T> Response<T> {
pub fn map<F, U>(src: Response<T>, fun: F) -> Response<U>
where
F: FnOnce(T) -> U,
{
Response {
rate_limit: src.rate_limit,
rate_limit_remaining: src.rate_limit_remaining,
rate_limit_reset: src.rate_limit_reset,
response: fun(src.response),
}
}
}
impl<T> Response<Vec<T>> {
pub fn iter(&self) -> ResponseIterRef<T> {
ResponseIterRef {
rate_limit: self.rate_limit,
rate_limit_remaining: self.rate_limit_remaining,
rate_limit_reset: self.rate_limit_reset,
resp_iter: self.response.iter(),
}
}
pub fn iter_mut(&mut self) -> ResponseIterMut<T> {
ResponseIterMut {
rate_limit: self.rate_limit,
rate_limit_remaining: self.rate_limit_remaining,
rate_limit_reset: self.rate_limit_reset,
resp_iter: self.response.iter_mut(),
}
}
}
impl<T> Deref for Response<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.response
}
}
impl<T> DerefMut for Response<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.response
}
}
pub struct ResponseIterRef<'a, T>
where
T: 'a,
{
rate_limit: i32,
rate_limit_remaining: i32,
rate_limit_reset: i32,
resp_iter: slice::Iter<'a, T>,
}
impl<'a, T> Iterator for ResponseIterRef<'a, T>
where
T: 'a,
{
type Item = Response<&'a T>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(resp) = self.resp_iter.next() {
Some(Response {
rate_limit: self.rate_limit,
rate_limit_remaining: self.rate_limit_remaining,
rate_limit_reset: self.rate_limit_reset,
response: resp,
})
} else {
None
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.resp_iter.size_hint()
}
}
impl<'a, T> DoubleEndedIterator for ResponseIterRef<'a, T>
where
T: 'a,
{
fn next_back(&mut self) -> Option<Self::Item> {
if let Some(resp) = self.resp_iter.next_back() {
Some(Response {
rate_limit: self.rate_limit,
rate_limit_remaining: self.rate_limit_remaining,
rate_limit_reset: self.rate_limit_reset,
response: resp,
})
} else {
None
}
}
}
impl<'a, T> ExactSizeIterator for ResponseIterRef<'a, T>
where
T: 'a,
{
fn len(&self) -> usize {
self.resp_iter.len()
}
}
impl<'a, T> IntoIterator for &'a Response<Vec<T>>
where
T: 'a,
{
type Item = Response<&'a T>;
type IntoIter = ResponseIterRef<'a, T>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
pub struct ResponseIterMut<'a, T>
where
T: 'a,
{
rate_limit: i32,
rate_limit_remaining: i32,
rate_limit_reset: i32,
resp_iter: slice::IterMut<'a, T>,
}
impl<'a, T> Iterator for ResponseIterMut<'a, T>
where
T: 'a,
{
type Item = Response<&'a mut T>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(resp) = self.resp_iter.next() {
Some(Response {
rate_limit: self.rate_limit,
rate_limit_remaining: self.rate_limit_remaining,
rate_limit_reset: self.rate_limit_reset,
response: resp,
})
} else {
None
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.resp_iter.size_hint()
}
}
impl<'a, T> DoubleEndedIterator for ResponseIterMut<'a, T>
where
T: 'a,
{
fn next_back(&mut self) -> Option<Self::Item> {
if let Some(resp) = self.resp_iter.next_back() {
Some(Response {
rate_limit: self.rate_limit,
rate_limit_remaining: self.rate_limit_remaining,
rate_limit_reset: self.rate_limit_reset,
response: resp,
})
} else {
None
}
}
}
impl<'a, T> ExactSizeIterator for ResponseIterMut<'a, T>
where
T: 'a,
{
fn len(&self) -> usize {
self.resp_iter.len()
}
}
impl<'a, T> IntoIterator for &'a mut Response<Vec<T>>
where
T: 'a,
{
type Item = Response<&'a mut T>;
type IntoIter = ResponseIterMut<'a, T>;
fn into_iter(self) -> Self::IntoIter {
self.iter_mut()
}
}
pub struct ResponseIter<T> {
rate_limit: i32,
rate_limit_remaining: i32,
rate_limit_reset: i32,
resp_iter: vec::IntoIter<T>,
}
impl<T> Iterator for ResponseIter<T> {
type Item = Response<T>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(resp) = self.resp_iter.next() {
Some(Response {
rate_limit: self.rate_limit,
rate_limit_remaining: self.rate_limit_remaining,
rate_limit_reset: self.rate_limit_reset,
response: resp,
})
} else {
None
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.resp_iter.size_hint()
}
}
impl<T> DoubleEndedIterator for ResponseIter<T> {
fn next_back(&mut self) -> Option<Self::Item> {
if let Some(resp) = self.resp_iter.next_back() {
Some(Response {
rate_limit: self.rate_limit,
rate_limit_remaining: self.rate_limit_remaining,
rate_limit_reset: self.rate_limit_reset,
response: resp,
})
} else {
None
}
}
}
impl<T> ExactSizeIterator for ResponseIter<T> {
fn len(&self) -> usize {
self.resp_iter.len()
}
}
impl<T> IntoIterator for Response<Vec<T>> {
type Item = Response<T>;
type IntoIter = ResponseIter<T>;
fn into_iter(self) -> Self::IntoIter {
ResponseIter {
rate_limit: self.rate_limit,
rate_limit_remaining: self.rate_limit_remaining,
rate_limit_reset: self.rate_limit_reset,
resp_iter: self.response.into_iter(),
}
}
}
impl<T> FromIterator<Response<T>> for Response<Vec<T>> {
fn from_iter<I>(iter: I) -> Self
where
I: IntoIterator<Item = Response<T>>,
{
let mut resp = Response {
rate_limit: -1,
rate_limit_remaining: -1,
rate_limit_reset: -1,
response: Vec::new(),
};
for item in iter {
if item.rate_limit_reset > resp.rate_limit_reset {
resp.rate_limit = item.rate_limit;
resp.rate_limit_remaining = item.rate_limit_remaining;
resp.rate_limit_reset = item.rate_limit_reset;
} else if (item.rate_limit_reset == resp.rate_limit_reset)
&& (item.rate_limit_remaining < resp.rate_limit_remaining)
{
resp.rate_limit = item.rate_limit;
resp.rate_limit_remaining = item.rate_limit_remaining;
resp.rate_limit_reset = item.rate_limit_reset;
}
resp.response.push(item.response);
}
resp
}
}
pub fn get_response(request: Request<Body>) -> Result<ResponseFuture, error::Error> {
#[cfg(feature = "native-tls")]
let connector = HttpsConnector::new(1)?;
#[cfg(feature = "hyper-rustls")]
let connector = HttpsConnector::new(1);
let client = hyper::Client::builder().build(connector);
Ok(client.request(request))
}
#[must_use = "futures do nothing unless polled"]
pub struct RawFuture {
request: Option<Request<Body>>,
response: Option<ResponseFuture>,
resp_headers: Option<Headers>,
resp_status: Option<StatusCode>,
body_stream: Option<Body>,
body: Vec<u8>,
}
impl RawFuture {
fn headers(&self) -> &Headers {
self.resp_headers.as_ref().unwrap()
}
}
impl Future for RawFuture {
type Item = String;
type Error = error::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(req) = self.request.take() {
self.response = Some(get_response(req)?);
}
if let Some(mut resp) = self.response.take() {
match resp.poll() {
Err(e) => return Err(e.into()),
Ok(Async::NotReady) => {
self.response = Some(resp);
return Ok(Async::NotReady);
}
Ok(Async::Ready(resp)) => {
self.resp_headers = Some(resp.headers().clone());
self.resp_status = Some(resp.status());
if let Some(len) = resp.headers().get(CONTENT_LENGTH) {
if let Ok(len) = len.to_str() {
if let Ok(len) = len.parse::<usize>() {
self.body.reserve(len);
}
}
}
self.body_stream = Some(resp.into_body());
}
}
}
if let Some(mut resp) = self.body_stream.take() {
loop {
match resp.poll() {
Err(e) => return Err(e.into()),
Ok(Async::NotReady) => {
self.body_stream = Some(resp);
return Ok(Async::NotReady);
}
Ok(Async::Ready(None)) => break,
Ok(Async::Ready(Some(chunk))) => {
self.body.extend(&*chunk);
}
}
}
} else {
return Err(FutureAlreadyCompleted);
};
match String::from_utf8(mem::replace(&mut self.body, Vec::new())) {
Err(_) => Err(io::Error::new(
io::ErrorKind::InvalidData,
"stream did not contain valid UTF-8",
)
.into()),
Ok(resp) => {
if let Ok(err) = serde_json::from_str::<TwitterErrors>(&resp) {
if err.errors.iter().any(|e| e.code == 88)
&& self.headers().contains_key(X_RATE_LIMIT_RESET)
{
return Err(RateLimit(rate_limit_reset(self.headers())?.unwrap()));
} else {
return Err(TwitterError(err));
}
}
match self.resp_status.unwrap() {
st if st.is_success() => Ok(Async::Ready(resp)),
st => Err(BadStatus(st)),
}
}
}
}
}
pub fn make_raw_future(request: Request<Body>) -> RawFuture {
RawFuture {
request: Some(request),
response: None,
resp_headers: None,
resp_status: None,
body_stream: None,
body: Vec::new(),
}
}
#[must_use = "futures do nothing unless polled"]
pub struct TwitterFuture<T> {
request: RawFuture,
make_resp: fn(String, &Headers) -> Result<T, error::Error>,
}
impl<T> Future for TwitterFuture<T> {
type Item = T;
type Error = error::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let full_resp = match self.request.poll() {
Err(e) => return Err(e),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(r)) => r,
};
Ok(Async::Ready((self.make_resp)(
full_resp,
self.request.headers(),
)?))
}
}
pub fn make_response<T: for<'a> serde::Deserialize<'a>>(
full_resp: String,
headers: &Headers,
) -> Result<Response<T>, error::Error> {
let out = serde_json::from_str(&full_resp)?;
Ok(Response::map(rate_headers(headers)?, |_| out))
}
pub fn make_future<T>(
request: Request<Body>,
make_resp: fn(String, &Headers) -> Result<T, error::Error>,
) -> TwitterFuture<T> {
TwitterFuture {
request: make_raw_future(request),
make_resp: make_resp,
}
}
pub fn make_parsed_future<T: for<'de> serde::Deserialize<'de>>(
request: Request<Body>,
) -> TwitterFuture<Response<T>> {
make_future(request, make_response)
}
pub fn rate_headers(resp: &Headers) -> Result<Response<()>, error::Error> {
Ok(Response {
rate_limit: rate_limit_limit(resp)?.unwrap_or(-1),
rate_limit_remaining: rate_limit_remaining(resp)?.unwrap_or(-1),
rate_limit_reset: rate_limit_reset(resp)?.unwrap_or(-1),
response: (),
})
}