use failure::Error;
use futures::future::{Future, IntoFuture};
use futures::stream::{unfold, Stream};
use lru_time_cache::LruCache;
use reqwest::header::{Authorization, Bearer, Headers, Raw};
use reqwest::unstable::async::{Chunk, Client};
use serde::de::DeserializeOwned;
use serde::ser::Serialize;
use serde_json;
use std::str::from_utf8;
use std::sync::Mutex;
use std::time::Duration;
pub mod graphql {
#![cfg_attr(feature = "cargo-clippy", allow(missing_docs_in_private_items))]
use chrono::{DateTime, Utc};
#[derive(Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct Connection<T> {
pub nodes: Vec<T>,
#[serde(default)]
pub page_info: PageInfo,
}
#[derive(Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct PageInfo {
pub end_cursor: String,
pub has_next_page: bool,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Reply {
pub data: Data,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Data {
pub repository: Repository,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Repository {
pub pull_requests: Connection<PullRequest>,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PullRequest {
pub author: Actor,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub mergeable: MergeableState,
pub number: u32,
pub title: String,
pub additions: u32,
pub deletions: u32,
pub base_ref_name: String,
pub head_ref_name: String,
pub body: String,
pub labels: Connection<Label>,
pub commits: Connection<PullRequestCommit>,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Actor {
pub login: String,
}
#[derive(Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Label {
pub name: String,
pub color: String,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PullRequestCommit {
pub commit: Commit,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Commit {
pub status: Option<Status>,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Status {
pub contexts: Vec<StatusContext>,
}
#[derive(Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct StatusContext {
pub context: String,
pub description: String,
pub target_url: String,
pub state: StatusState,
}
#[derive(Deserialize, Serialize, PartialEq, Eq, Clone, Copy)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum MergeableState {
Unknown,
Mergeable,
Conflicting,
}
#[derive(Deserialize, Serialize, Clone, Copy)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum StatusState {
Expected,
Error,
Failure,
Pending,
Success,
}
}
#[derive(Serialize)]
struct Request<'variables> {
query: &'static str,
variables: Variables<'variables>,
}
#[derive(Serialize)]
struct Variables<'variables> {
owner: &'variables str,
repo: &'variables str,
after: Option<&'variables str>,
}
const GITHUB_ENDPOINT: &str = "https://api.github.com/graphql";
const QUERY: &str = include!("github.gql");
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone)]
pub(super) enum CacheKey {
List(Option<Box<str>>),
Timeline(u32),
}
impl<'a, 'v: 'a> From<&'a Request<'v>> for CacheKey {
fn from(req: &'a Request<'v>) -> Self {
CacheKey::List(req.variables.after.map(|v| v.to_owned().into_boxed_str()))
}
}
lazy_static! {
static ref CACHE: Mutex<LruCache<CacheKey, Chunk>> =
Mutex::new(LruCache::with_expiry_duration(Duration::from_secs(120)));
}
enum PaginationState {
First,
HasNext(String),
Done,
}
impl PaginationState {
fn is_done(&self) -> bool {
match *self {
PaginationState::Done => true,
_ => false,
}
}
fn as_after(&self) -> Option<&str> {
match *self {
PaginationState::HasNext(ref s) => Some(&**s),
_ => None,
}
}
}
pub fn query(
client: Client,
token: String,
owner: String,
repo: String,
) -> Box<Future<Item = Vec<graphql::PullRequest>, Error = Error>> {
Box::new(
unfold(PaginationState::First, move |next_page| {
if next_page.is_done() {
None
} else {
Some(query_single_page(
&client,
&token,
&owner,
&repo,
next_page.as_after(),
))
}
}).concat2(),
)
}
pub(super) fn send_github_query<'a, R, T>(
client: &Client,
token: &str,
request: &'a T,
) -> Box<Future<Item = R, Error = Error>>
where
R: DeserializeOwned + 'static,
T: Serialize + ?Sized + 'a,
&'a T: Into<CacheKey>,
{
info!("Preparing to send GitHub request");
let cache_key = request.into();
{
let cache_read_guard = CACHE.lock().expect("poisoned");
if let Some(body) = cache_read_guard.peek(&cache_key) {
info!("Obtained cached response");
return Box::new(
serde_json::from_slice(body)
.map_err(Error::from)
.into_future(),
);
}
}
Box::new(
client
.post(GITHUB_ENDPOINT)
.header(Authorization(Bearer {
token: token.to_owned(),
}))
.json(request)
.send()
.and_then(|response| response.error_for_status())
.inspect(|response| {
let headers = response.headers();
let rate_limit_remaining = fetch_rate_limit(headers, "X-RateLimit-Remaining");
let rate_limit_limit = fetch_rate_limit(headers, "X-RateLimit-Limit");
info!(
"GitHub rate limit: {}/{}",
rate_limit_remaining, rate_limit_limit
);
})
.and_then(|response| response.into_body().concat2())
.map_err(Error::from)
.and_then(|body| {
let mut cache_write_guard = CACHE.lock().expect("poisoned");
let ret = serde_json::from_slice(&body).map_err(Error::from);
cache_write_guard.insert(cache_key, body);
ret.map_err(Error::from)
}),
)
}
fn query_single_page(
client: &Client,
token: &str,
owner: &str,
repo: &str,
after: Option<&str>,
) -> Box<Future<Item = (Vec<graphql::PullRequest>, PaginationState), Error = Error>> {
Box::new(
send_github_query(
client,
token,
&Request {
query: QUERY,
variables: Variables { owner, repo, after },
},
).map(|reply: graphql::Reply| {
let prs = reply.data.repository.pull_requests.nodes;
let next_page = match reply.data.repository.pull_requests.page_info {
graphql::PageInfo {
has_next_page: true,
end_cursor,
} => PaginationState::HasNext(end_cursor),
graphql::PageInfo {
has_next_page: false,
..
} => PaginationState::Done,
};
info!(
"Obtained {} PRs from GitHub, has next page = {}",
prs.len(),
!next_page.is_done()
);
(prs, next_page)
}),
)
}
fn fetch_rate_limit(headers: &Headers, name: &str) -> u32 {
headers
.get_raw(name)
.and_then(Raw::one)
.and_then(|one| from_utf8(one).ok())
.and_then(|string| string.parse().ok())
.unwrap_or(0)
}