#[cfg(not(target_arch = "wasm32"))]
use futures::future::BoxFuture;
#[cfg(not(target_arch = "wasm32"))]
use futures::prelude::*;
use futures::{future::try_join_all, try_join};
use reqwest::{header, Client as HttpClient, StatusCode, Url};
use serde::de::DeserializeOwned;
#[cfg(not(target_arch = "wasm32"))]
use std::collections::VecDeque;
use web_time::Duration;
use super::Error;
use crate::error::JsonDecodeError;
use crate::types::*;
use crate::util::*;
#[derive(Clone)]
pub struct Client {
client: HttpClient,
rate_limit: Duration,
last_request_time: std::sync::Arc<tokio::sync::Mutex<Option<web_time::Instant>>>,
base_url: Url,
}
#[cfg(not(target_arch = "wasm32"))]
#[cfg_attr(docsrs, doc(cfg(not(target_arch = "wasm32"))))]
pub struct CrateStream {
client: Client,
filter: CratesQuery,
closed: bool,
items: VecDeque<Crate>,
next_page_fetch: Option<BoxFuture<'static, Result<CratesPage, Error>>>,
}
#[cfg(not(target_arch = "wasm32"))]
#[cfg_attr(docsrs, doc(cfg(not(target_arch = "wasm32"))))]
impl CrateStream {
fn new(client: Client, filter: CratesQuery) -> Self {
Self {
client,
filter,
closed: false,
items: VecDeque::new(),
next_page_fetch: None,
}
}
}
#[cfg(not(target_arch = "wasm32"))]
#[cfg_attr(docsrs, doc(cfg(not(target_arch = "wasm32"))))]
impl futures::stream::Stream for CrateStream {
type Item = Result<Crate, Error>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let inner = self.get_mut();
if inner.closed {
return std::task::Poll::Ready(None);
}
if let Some(krate) = inner.items.pop_front() {
return std::task::Poll::Ready(Some(Ok(krate)));
}
if let Some(mut fut) = inner.next_page_fetch.take() {
return match fut.poll_unpin(cx) {
std::task::Poll::Ready(res) => match res {
Ok(page) if page.crates.is_empty() => {
inner.closed = true;
std::task::Poll::Ready(None)
}
Ok(page) => {
let mut iter = page.crates.into_iter();
let next = iter.next();
inner.items.extend(iter);
std::task::Poll::Ready(next.map(Ok))
}
Err(err) => {
inner.closed = true;
std::task::Poll::Ready(Some(Err(err)))
}
},
std::task::Poll::Pending => {
inner.next_page_fetch = Some(fut);
std::task::Poll::Pending
}
};
}
let filter = inner.filter.clone();
inner.filter.page += 1;
let c = inner.client.clone();
let mut f = Box::pin(async move { c.crates(filter).await });
assert!(matches!(f.poll_unpin(cx), std::task::Poll::Pending));
inner.next_page_fetch = Some(f);
cx.waker().wake_by_ref();
std::task::Poll::Pending
}
}
impl Client {
pub fn new(
user_agent: &str,
rate_limit: Duration,
) -> Result<Self, reqwest::header::InvalidHeaderValue> {
let mut headers = header::HeaderMap::new();
headers.insert(
header::USER_AGENT,
header::HeaderValue::from_str(user_agent)?,
);
let client = HttpClient::builder()
.default_headers(headers)
.build()
.unwrap();
Ok(Self::with_http_client(client, rate_limit))
}
pub fn with_http_client(client: HttpClient, rate_limit: Duration) -> Self {
let limiter = std::sync::Arc::new(tokio::sync::Mutex::new(None));
Self {
rate_limit,
last_request_time: limiter,
client,
base_url: Url::parse("https://crates.io/api/v1/").unwrap(),
}
}
async fn get<T: DeserializeOwned>(&self, url: &Url) -> Result<T, Error> {
let mut lock = self.last_request_time.clone().lock_owned().await;
if let Some(last_request_time) = lock.take() {
if last_request_time.elapsed() < self.rate_limit {
tokio::time::sleep(self.rate_limit - last_request_time.elapsed()).await;
}
}
let time = web_time::Instant::now();
let res = self.client.get(url.clone()).send().await?;
if !res.status().is_success() {
let err = match res.status() {
StatusCode::NOT_FOUND => Error::NotFound(super::error::NotFoundError {
url: url.to_string(),
}),
StatusCode::FORBIDDEN => {
let reason = res.text().await.unwrap_or_default();
Error::PermissionDenied(super::error::PermissionDeniedError { reason })
}
_ => Error::from(res.error_for_status().unwrap_err()),
};
return Err(err);
}
let content = res.text().await?;
(*lock) = Some(time);
if let Ok(errors) = serde_json::from_str::<ApiErrors>(&content) {
return Err(Error::Api(errors));
}
let jd = &mut serde_json::Deserializer::from_str(&content);
serde_path_to_error::deserialize::<_, T>(jd).map_err(|err| {
Error::JsonDecode(JsonDecodeError {
message: format!("Could not decode JSON: {err} (path: {})", err.path()),
})
})
}
pub async fn summary(&self) -> Result<Summary, Error> {
let url = self.base_url.join("summary").unwrap();
self.get(&url).await
}
pub async fn get_crate(&self, crate_name: &str) -> Result<CrateResponse, Error> {
let url = build_crate_url(&self.base_url, crate_name)?;
self.get(&url).await
}
pub async fn crate_downloads(&self, crate_name: &str) -> Result<CrateDownloads, Error> {
let url = build_crate_downloads_url(&self.base_url, crate_name)?;
self.get(&url).await
}
pub async fn crate_owners(&self, name: &str) -> Result<Vec<User>, Error> {
let url = build_crate_owners_url(&self.base_url, name)?;
self.get::<Owners>(&url).await.map(|data| data.users)
}
pub async fn crate_reverse_dependencies_page(
&self,
crate_name: &str,
page: u64,
) -> Result<ReverseDependencies, Error> {
let page = page.max(1);
let url = build_crate_reverse_deps_url(&self.base_url, crate_name, page)?;
let page = self.get::<ReverseDependenciesAsReceived>(&url).await?;
let mut deps = ReverseDependencies {
dependencies: Vec::new(),
meta: Meta { total: 0 },
};
deps.meta.total = page.meta.total;
deps.extend(page);
Ok(deps)
}
pub async fn crate_reverse_dependencies(
&self,
crate_name: &str,
) -> Result<ReverseDependencies, Error> {
let mut deps = ReverseDependencies {
dependencies: Vec::new(),
meta: Meta { total: 0 },
};
for page_number in 1.. {
let page = self
.crate_reverse_dependencies_page(crate_name, page_number)
.await?;
if page.dependencies.is_empty() {
break;
}
deps.dependencies.extend(page.dependencies);
deps.meta.total = page.meta.total;
}
Ok(deps)
}
pub async fn crate_reverse_dependency_count(&self, crate_name: &str) -> Result<u64, Error> {
let page = self.crate_reverse_dependencies_page(crate_name, 1).await?;
Ok(page.meta.total)
}
pub async fn crate_authors(&self, crate_name: &str, version: &str) -> Result<Authors, Error> {
let url = build_crate_authors_url(&self.base_url, crate_name, version)?;
self.get::<AuthorsResponse>(&url).await.map(|res| Authors {
names: res.meta.names,
})
}
pub async fn crate_dependencies(
&self,
crate_name: &str,
version: &str,
) -> Result<Vec<Dependency>, Error> {
let url = build_crate_dependencies_url(&self.base_url, crate_name, version)?;
self.get::<Dependencies>(&url)
.await
.map(|res| res.dependencies)
}
async fn full_version(&self, version: Version) -> Result<FullVersion, Error> {
let authors_fut = self.crate_authors(&version.crate_name, &version.num);
let deps_fut = self.crate_dependencies(&version.crate_name, &version.num);
try_join!(authors_fut, deps_fut)
.map(|(authors, deps)| FullVersion::from_parts(version, authors, deps))
}
pub async fn full_crate(&self, name: &str, all_versions: bool) -> Result<FullCrate, Error> {
let krate = self.get_crate(name).await?;
let versions = if !all_versions {
self.full_version(krate.versions[0].clone())
.await
.map(|v| vec![v])
} else {
try_join_all(
krate
.versions
.clone()
.into_iter()
.map(|v| self.full_version(v)),
)
.await
}?;
let dls_fut = self.crate_downloads(name);
let owners_fut = self.crate_owners(name);
let reverse_dependencies_fut = self.crate_reverse_dependencies(name);
try_join!(dls_fut, owners_fut, reverse_dependencies_fut).map(
|(dls, owners, reverse_dependencies)| {
let data = krate.crate_data;
FullCrate {
id: data.id,
name: data.name,
description: data.description,
license: krate.versions[0].license.clone(),
documentation: data.documentation,
homepage: data.homepage,
repository: data.repository,
total_downloads: data.downloads,
recent_downloads: data.recent_downloads,
max_version: data.max_version,
max_stable_version: data.max_stable_version,
created_at: data.created_at,
updated_at: data.updated_at,
categories: krate.categories,
keywords: krate.keywords,
downloads: dls,
owners,
reverse_dependencies,
versions,
}
},
)
}
pub async fn crates(&self, query: CratesQuery) -> Result<CratesPage, Error> {
let mut url = self.base_url.join("crates").unwrap();
query.build(url.query_pairs_mut());
self.get(&url).await
}
#[cfg(not(target_arch = "wasm32"))]
#[cfg_attr(docsrs, doc(cfg(not(target_arch = "wasm32"))))]
pub fn crates_stream(&self, filter: CratesQuery) -> CrateStream {
CrateStream::new(self.clone(), filter)
}
pub async fn user(&self, username: &str) -> Result<User, Error> {
let url = self.base_url.join(&format!("users/{}", username)).unwrap();
self.get::<UserResponse>(&url).await.map(|res| res.user)
}
}
#[cfg(test)]
mod test {
use super::*;
fn build_test_client() -> Client {
Client::new(
"crates-io-api-continuous-integration (github.com/theduke/crates-io-api)",
web_time::Duration::from_millis(1000),
)
.unwrap()
}
#[tokio::test]
async fn test_summary_async() -> Result<(), Error> {
let client = build_test_client();
let summary = client.summary().await?;
assert!(!summary.most_downloaded.is_empty());
assert!(!summary.just_updated.is_empty());
assert!(!summary.new_crates.is_empty());
assert!(!summary.most_recently_downloaded.is_empty());
assert!(summary.num_crates > 0);
assert!(summary.num_downloads > 0);
assert!(!summary.popular_categories.is_empty());
assert!(!summary.popular_keywords.is_empty());
Ok(())
}
#[tokio::test]
async fn test_crates_stream_async() {
let client = build_test_client();
let mut stream = client.crates_stream(CratesQuery {
per_page: 10,
..Default::default()
});
for _ in 0..40 {
let _krate = stream.next().await.unwrap().unwrap();
eprintln!("CRATE {}", _krate.name);
}
}
#[tokio::test]
async fn test_full_crate_async() -> Result<(), Error> {
let client = build_test_client();
client.full_crate("crates_io_api", false).await?;
Ok(())
}
#[tokio::test]
async fn test_user_get_async() -> Result<(), Error> {
let client = build_test_client();
let user = client.user("theduke").await?;
assert_eq!(user.login, "theduke");
Ok(())
}
#[tokio::test]
async fn test_crates_filter_by_user_async() -> Result<(), Error> {
let client = build_test_client();
let user = client.user("theduke").await?;
let res = client
.crates(CratesQuery {
user_id: Some(user.id),
per_page: 20,
..Default::default()
})
.await?;
assert!(!res.crates.is_empty());
for krate in res.crates {
let owners = client.crate_owners(&krate.name).await?;
assert!(owners.iter().any(|o| o.id == user.id));
}
Ok(())
}
#[tokio::test]
async fn test_crates_filter_by_category_async() -> Result<(), Error> {
let client = build_test_client();
let category = "wasm".to_string();
let res = client
.crates(CratesQuery {
category: Some(category.clone()),
per_page: 3,
..Default::default()
})
.await?;
assert!(!res.crates.is_empty());
for list_crate in res.crates {
let krate = client.get_crate(&list_crate.name).await?;
assert!(krate.categories.iter().any(|c| c.id == category));
}
Ok(())
}
#[tokio::test]
async fn test_crates_filter_by_ids_async() -> Result<(), Error> {
let client = build_test_client();
let ids = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]
.map(Into::into)
.to_vec();
let res = client
.crates(CratesQuery {
ids: Some(ids),
per_page: 10,
..Default::default()
})
.await?;
assert_eq!(
res.crates.len(),
10,
"Expected 10 crates, actually got {}. Crates: {:#?}",
res.crates.len(),
res.crates
);
Ok(())
}
#[tokio::test]
async fn test_crate_reverse_dependency_count_async() -> Result<(), Error> {
let client = build_test_client();
let count = client
.crate_reverse_dependency_count("crates_io_api")
.await?;
assert!(count > 0);
Ok(())
}
#[tokio::test]
async fn test_get_crate_with_slash() {
let client = build_test_client();
match client.get_crate("a/b").await {
Err(Error::NotFound(_)) => {}
other => {
panic!("Invalid response: expected NotFound error, got {:?}", other);
}
}
}
}