pub mod auth;
use std::cell::{Cell, RefCell};
use std::collections::HashMap;
use std::hash::BuildHasher;
use std::thread;
use std::time::{Duration, Instant};
use futures::Stream;
use hyper::client::{Client, HttpConnector};
use hyper::header::{self, HeaderValue};
use hyper::{Body, Request, Response, Uri};
use hyper_tls::HttpsConnector;
use json;
use json::Value;
use tokio_core::reactor::Core;
use self::auth::OAuth;
use errors::RedditError;
use failure::Error;
#[derive(Copy, Clone)]
pub enum LimitMethod {
Steady,
Burst,
}
pub struct Connection {
pub auth: Option<auth::OAuth>,
pub useragent: HeaderValue,
pub client: Client<HttpsConnector<HttpConnector>, Body>,
core: RefCell<Core>,
pub limit: Cell<LimitMethod>,
reqs: Cell<i32>,
remaining: Cell<Option<i32>>,
reset_time: Cell<Instant>,
}
impl Connection {
pub fn new(appname: &str, appversion: &str, appauthor: &str) -> Result<Connection, Error> {
let useragent = HeaderValue::from_str(&format!("linux:{}:{} (by {})", appname, appversion, appauthor)).unwrap();
let core = Core::new()?;
let client = Client::builder().build(HttpsConnector::new(1)?);
Ok(Connection {
auth: None,
useragent,
client,
core: RefCell::new(core),
limit: Cell::new(LimitMethod::Steady),
reqs: Cell::new(0),
remaining: Cell::new(None),
reset_time: Cell::new(Instant::now()),
})
}
pub fn run_request(&self, mut req: Request<Body>) -> Result<Value, Error> {
let req_str = format!("{:?}", req);
match self.limit.get() {
LimitMethod::Steady => {
if let Some(remaining) = self.remaining.get() {
if Instant::now() < self.reset_time.get() {
trace!("Ratelimiting in steady mode for {:?}", self.reset_time.get() - Instant::now());
thread::sleep((self.reset_time.get() - Instant::now()).checked_div(remaining as u32).unwrap());
}
}
}
LimitMethod::Burst => {
if let Some(remaining) = self.remaining.get() {
if remaining <= 0 && self.reset_time.get() > Instant::now() {
trace!("Ratelimiting in burst mode for {:?}", self.reset_time.get() - Instant::now());
thread::sleep(self.reset_time.get() - Instant::now());
}
}
}
};
req.headers_mut().insert(header::USER_AGENT, self.useragent.clone());
trace!("Sending request {:?}", req);
let response = self.client.request(req);
let response = self.core.borrow_mut().run(response)?;
if let Some(reqs_used) = response.headers().get("x-ratelimit-used") {
let reqs_used = reqs_used.to_str().unwrap().parse::<f32>().unwrap().round() as i32;
trace!("Used {} of requests in ratelimit period", reqs_used);
self.reqs.set(reqs_used);
}
if let Some(reqs_remaining) = response.headers().get("x-ratelimit-remaining") {
let reqs_remaining = reqs_remaining.to_str().unwrap().parse::<f32>().unwrap().round() as i32;
trace!("Have {} requests remaining in ratelimit period", reqs_remaining);
self.remaining.set(Some(reqs_remaining));
}
if let Some(secs_remaining) = response.headers().get("x-ratelimit-reset") {
let secs_remaining = secs_remaining.to_str().unwrap().parse::<f32>().unwrap().round() as u64;
trace!("Have {} seconds remaining to ratelimit reset", secs_remaining);
self.reset_time.set(Instant::now() + Duration::new(secs_remaining, 0));
}
trace!("Ratelimiting:\n\tRequests used: {:?}\n\tRequests remaining: {:?}\n\tReset time: {:?}\n\tNow: {:?}", self.reqs.get(), self.remaining.get(), self.reset_time.get(), Instant::now());
let response_str = format!("{:?}", response);
let get_body = |response: Response<Body>| -> Result<String, Error> {
let body = self.core.borrow_mut().run(response.into_body().concat2())?;
let body: String = String::from_utf8_lossy(&body).into();
Ok(body)
};
if !response.status().is_success() {
error!("Got error response: {}", response_str);
return Err(Error::from(RedditError::BadRequest {
request: req_str,
response: format!("Reponse: {}\nResponse body: {:?}", response_str, get_body(response)?),
}));
}
let body = get_body(response)?;
match json::from_str(&body) {
Ok(r) => {
trace!("Got successful response: {:?}\nBody: {}", response_str, body);
Ok(r)
}
Err(_) => Err(Error::from(RedditError::BadResponse { request: req_str, response: body })),
}
}
pub fn run_auth_request(&self, mut req: Request<Body>) -> Result<Value, Error> {
if let Some(ref auth) = self.auth {
let req_str = format!("{:?}", req);
req.headers_mut().insert(
header::AUTHORIZATION,
HeaderValue::from_str(&format!(
"Bearer {}",
match *auth {
OAuth::Script {
id: ref _id,
secret: ref _secret,
username: ref _username,
password: ref _password,
ref token,
} => token.to_string(),
OAuth::InstalledApp {
id: ref _id,
redirect: ref _redirect,
ref token,
ref refresh_token,
ref expire_instant,
} => {
if let (Some(_refresh_token), Some(expire_instant)) = (refresh_token.borrow().clone(), expire_instant.get()) {
if Instant::now() > expire_instant {
auth.refresh(self)?;
}
token.borrow().to_string()
} else if let Some(expire_instant) = expire_instant.get() {
if Instant::now() > expire_instant {
return Err(Error::from(RedditError::Forbidden { request: format!("{:?}", req_str) }));
} else {
token.borrow().to_string()
}
} else {
token.borrow().to_string()
}
}
}
))
.unwrap(),
);
self.run_request(req)
} else {
Err(Error::from(RedditError::Forbidden { request: format!("{:?}", req) }))
}
}
pub fn set_limit(&self, limit: LimitMethod) {
self.limit.set(limit);
}
pub fn get_core(&self) -> &RefCell<Core> {
&self.core
}
}
pub fn body_from_map<S: BuildHasher>(map: &HashMap<&str, &str, S>) -> Body {
let mut body_str = String::new();
for (i, item) in map.iter().enumerate() {
body_str.push_str(&format!("{}={}{}", item.0, item.1, if i < map.len() - 1 { "&" } else { "" }));
}
trace!("Setup body: \n{}\n", body_str);
Body::from(body_str)
}
pub fn uri_params_from_map<S: BuildHasher>(url: &str, map: &HashMap<&str, &str, S>) -> Result<Uri, Error> {
use url::Url;
Ok(Url::parse_with_params(url, map)?.to_string().parse()?)
}