1#[cfg(not(target_arch = "wasm32"))]
2use futures::future::BoxFuture;
3#[cfg(not(target_arch = "wasm32"))]
4use futures::prelude::*;
5use futures::{future::try_join_all, try_join};
6use reqwest::{header, Client as HttpClient, StatusCode, Url};
7use serde::de::DeserializeOwned;
8
9#[cfg(not(target_arch = "wasm32"))]
10use std::collections::VecDeque;
11
12use web_time::Duration;
13
14use super::Error;
15use crate::error::JsonDecodeError;
16use crate::types::*;
17use crate::util::*;
18
19#[derive(Clone)]
21pub struct Client {
22 client: HttpClient,
23 rate_limit: Duration,
24 last_request_time: std::sync::Arc<tokio::sync::Mutex<Option<web_time::Instant>>>,
25 base_url: Url,
26}
27
28#[cfg(not(target_arch = "wasm32"))]
29#[cfg_attr(docsrs, doc(cfg(not(target_arch = "wasm32"))))]
30pub struct CrateStream {
31 client: Client,
32 filter: CratesQuery,
33
34 closed: bool,
35 items: VecDeque<Crate>,
36 next_page_fetch: Option<BoxFuture<'static, Result<CratesPage, Error>>>,
37}
38
39#[cfg(not(target_arch = "wasm32"))]
40#[cfg_attr(docsrs, doc(cfg(not(target_arch = "wasm32"))))]
41impl CrateStream {
42 fn new(client: Client, filter: CratesQuery) -> Self {
43 Self {
44 client,
45 filter,
46 closed: false,
47 items: VecDeque::new(),
48 next_page_fetch: None,
49 }
50 }
51}
52
53#[cfg(not(target_arch = "wasm32"))]
54#[cfg_attr(docsrs, doc(cfg(not(target_arch = "wasm32"))))]
55impl futures::stream::Stream for CrateStream {
56 type Item = Result<Crate, Error>;
57
58 fn poll_next(
59 self: std::pin::Pin<&mut Self>,
60 cx: &mut std::task::Context<'_>,
61 ) -> std::task::Poll<Option<Self::Item>> {
62 let inner = self.get_mut();
63
64 if inner.closed {
65 return std::task::Poll::Ready(None);
66 }
67
68 if let Some(krate) = inner.items.pop_front() {
69 return std::task::Poll::Ready(Some(Ok(krate)));
70 }
71
72 if let Some(mut fut) = inner.next_page_fetch.take() {
73 return match fut.poll_unpin(cx) {
74 std::task::Poll::Ready(res) => match res {
75 Ok(page) if page.crates.is_empty() => {
76 inner.closed = true;
77 std::task::Poll::Ready(None)
78 }
79 Ok(page) => {
80 let mut iter = page.crates.into_iter();
81 let next = iter.next();
82 inner.items.extend(iter);
83
84 std::task::Poll::Ready(next.map(Ok))
85 }
86 Err(err) => {
87 inner.closed = true;
88 std::task::Poll::Ready(Some(Err(err)))
89 }
90 },
91 std::task::Poll::Pending => {
92 inner.next_page_fetch = Some(fut);
93 std::task::Poll::Pending
94 }
95 };
96 }
97
98 let filter = inner.filter.clone();
99 inner.filter.page += 1;
100
101 let c = inner.client.clone();
102 let mut f = Box::pin(async move { c.crates(filter).await });
103 assert!(matches!(f.poll_unpin(cx), std::task::Poll::Pending));
104 inner.next_page_fetch = Some(f);
105
106 cx.waker().wake_by_ref();
107
108 std::task::Poll::Pending
109 }
110}
111
112impl Client {
113 pub fn new(
137 user_agent: &str,
138 rate_limit: Duration,
139 ) -> Result<Self, reqwest::header::InvalidHeaderValue> {
140 let mut headers = header::HeaderMap::new();
141 headers.insert(
142 header::USER_AGENT,
143 header::HeaderValue::from_str(user_agent)?,
144 );
145
146 let client = HttpClient::builder()
147 .default_headers(headers)
148 .build()
149 .unwrap();
150
151 Ok(Self::with_http_client(client, rate_limit))
152 }
153
154 pub fn with_http_client(client: HttpClient, rate_limit: Duration) -> Self {
163 let limiter = std::sync::Arc::new(tokio::sync::Mutex::new(None));
164
165 Self {
166 rate_limit,
167 last_request_time: limiter,
168 client,
169 base_url: Url::parse("https://crates.io/api/v1/").unwrap(),
170 }
171 }
172
173 async fn get<T: DeserializeOwned>(&self, url: &Url) -> Result<T, Error> {
174 let mut lock = self.last_request_time.clone().lock_owned().await;
175
176 if let Some(last_request_time) = lock.take() {
177 if last_request_time.elapsed() < self.rate_limit {
178 tokio::time::sleep(self.rate_limit - last_request_time.elapsed()).await;
179 }
180 }
181
182 let time = web_time::Instant::now();
183 let res = self.client.get(url.clone()).send().await?;
184
185 if !res.status().is_success() {
186 let err = match res.status() {
187 StatusCode::NOT_FOUND => Error::NotFound(super::error::NotFoundError {
188 url: url.to_string(),
189 }),
190 StatusCode::FORBIDDEN => {
191 let reason = res.text().await.unwrap_or_default();
192 Error::PermissionDenied(super::error::PermissionDeniedError { reason })
193 }
194 _ => Error::from(res.error_for_status().unwrap_err()),
195 };
196
197 return Err(err);
198 }
199
200 let content = res.text().await?;
201
202 (*lock) = Some(time);
204
205 if let Ok(errors) = serde_json::from_str::<ApiErrors>(&content) {
208 return Err(Error::Api(errors));
209 }
210
211 let jd = &mut serde_json::Deserializer::from_str(&content);
212 serde_path_to_error::deserialize::<_, T>(jd).map_err(|err| {
213 Error::JsonDecode(JsonDecodeError {
214 message: format!("Could not decode JSON: {err} (path: {})", err.path()),
215 })
216 })
217 }
218
219 pub async fn summary(&self) -> Result<Summary, Error> {
221 let url = self.base_url.join("summary").unwrap();
222 self.get(&url).await
223 }
224
225 pub async fn get_crate(&self, crate_name: &str) -> Result<CrateResponse, Error> {
229 let url = build_crate_url(&self.base_url, crate_name)?;
230
231 self.get(&url).await
232 }
233
234 pub async fn crate_downloads(&self, crate_name: &str) -> Result<CrateDownloads, Error> {
236 let url = build_crate_downloads_url(&self.base_url, crate_name)?;
237 self.get(&url).await
238 }
239
240 pub async fn crate_owners(&self, name: &str) -> Result<Vec<User>, Error> {
242 let url = build_crate_owners_url(&self.base_url, name)?;
243 self.get::<Owners>(&url).await.map(|data| data.users)
244 }
245
246 pub async fn crate_reverse_dependencies_page(
250 &self,
251 crate_name: &str,
252 page: u64,
253 ) -> Result<ReverseDependencies, Error> {
254 let page = page.max(1);
256
257 let url = build_crate_reverse_deps_url(&self.base_url, crate_name, page)?;
258 let page = self.get::<ReverseDependenciesAsReceived>(&url).await?;
259
260 let mut deps = ReverseDependencies {
261 dependencies: Vec::new(),
262 meta: Meta { total: 0 },
263 };
264 deps.meta.total = page.meta.total;
265 deps.extend(page);
266 Ok(deps)
267 }
268
269 pub async fn crate_reverse_dependencies(
275 &self,
276 crate_name: &str,
277 ) -> Result<ReverseDependencies, Error> {
278 let mut deps = ReverseDependencies {
279 dependencies: Vec::new(),
280 meta: Meta { total: 0 },
281 };
282
283 for page_number in 1.. {
284 let page = self
285 .crate_reverse_dependencies_page(crate_name, page_number)
286 .await?;
287 if page.dependencies.is_empty() {
288 break;
289 }
290 deps.dependencies.extend(page.dependencies);
291 deps.meta.total = page.meta.total;
292 }
293
294 Ok(deps)
295 }
296
297 pub async fn crate_reverse_dependency_count(&self, crate_name: &str) -> Result<u64, Error> {
299 let page = self.crate_reverse_dependencies_page(crate_name, 1).await?;
300 Ok(page.meta.total)
301 }
302
303 pub async fn crate_authors(&self, crate_name: &str, version: &str) -> Result<Authors, Error> {
305 let url = build_crate_authors_url(&self.base_url, crate_name, version)?;
306 self.get::<AuthorsResponse>(&url).await.map(|res| Authors {
307 names: res.meta.names,
308 })
309 }
310
311 pub async fn crate_dependencies(
313 &self,
314 crate_name: &str,
315 version: &str,
316 ) -> Result<Vec<Dependency>, Error> {
317 let url = build_crate_dependencies_url(&self.base_url, crate_name, version)?;
318 self.get::<Dependencies>(&url)
319 .await
320 .map(|res| res.dependencies)
321 }
322
323 async fn full_version(&self, version: Version) -> Result<FullVersion, Error> {
324 let authors_fut = self.crate_authors(&version.crate_name, &version.num);
325 let deps_fut = self.crate_dependencies(&version.crate_name, &version.num);
326 try_join!(authors_fut, deps_fut)
327 .map(|(authors, deps)| FullVersion::from_parts(version, authors, deps))
328 }
329
330 pub async fn full_crate(&self, name: &str, all_versions: bool) -> Result<FullCrate, Error> {
339 let krate = self.get_crate(name).await?;
340 let versions = if !all_versions {
341 self.full_version(krate.versions[0].clone())
342 .await
343 .map(|v| vec![v])
344 } else {
345 try_join_all(
346 krate
347 .versions
348 .clone()
349 .into_iter()
350 .map(|v| self.full_version(v)),
351 )
352 .await
353 }?;
354 let dls_fut = self.crate_downloads(name);
355 let owners_fut = self.crate_owners(name);
356 let reverse_dependencies_fut = self.crate_reverse_dependencies(name);
357 try_join!(dls_fut, owners_fut, reverse_dependencies_fut).map(
358 |(dls, owners, reverse_dependencies)| {
359 let data = krate.crate_data;
360 FullCrate {
361 id: data.id,
362 name: data.name,
363 description: data.description,
364 license: krate.versions[0].license.clone(),
365 documentation: data.documentation,
366 homepage: data.homepage,
367 repository: data.repository,
368 total_downloads: data.downloads,
369 recent_downloads: data.recent_downloads,
370 max_version: data.max_version,
371 max_stable_version: data.max_stable_version,
372 created_at: data.created_at,
373 updated_at: data.updated_at,
374 categories: krate.categories,
375 keywords: krate.keywords,
376 downloads: dls,
377 owners,
378 reverse_dependencies,
379 versions,
380 }
381 },
382 )
383 }
384
385 pub async fn crates(&self, query: CratesQuery) -> Result<CratesPage, Error> {
390 let mut url = self.base_url.join("crates").unwrap();
391 query.build(url.query_pairs_mut());
392 self.get(&url).await
393 }
394
395 #[cfg(not(target_arch = "wasm32"))]
397 #[cfg_attr(docsrs, doc(cfg(not(target_arch = "wasm32"))))]
398 pub fn crates_stream(&self, filter: CratesQuery) -> CrateStream {
399 CrateStream::new(self.clone(), filter)
400 }
401
402 pub async fn user(&self, username: &str) -> Result<User, Error> {
404 let url = self.base_url.join(&format!("users/{}", username)).unwrap();
405 self.get::<UserResponse>(&url).await.map(|res| res.user)
406 }
407}
408
409#[cfg(test)]
410mod test {
411 use super::*;
412
413 fn build_test_client() -> Client {
414 Client::new(
415 "crates-io-api-continuous-integration (github.com/theduke/crates-io-api)",
416 web_time::Duration::from_millis(1000),
417 )
418 .unwrap()
419 }
420
421 #[tokio::test]
422 async fn test_summary_async() -> Result<(), Error> {
423 let client = build_test_client();
424 let summary = client.summary().await?;
425 assert!(!summary.most_downloaded.is_empty());
426 assert!(!summary.just_updated.is_empty());
427 assert!(!summary.new_crates.is_empty());
428 assert!(!summary.most_recently_downloaded.is_empty());
429 assert!(summary.num_crates > 0);
430 assert!(summary.num_downloads > 0);
431 assert!(!summary.popular_categories.is_empty());
432 assert!(!summary.popular_keywords.is_empty());
433 Ok(())
434 }
435
436 #[tokio::test]
437 async fn test_crates_stream_async() {
438 let client = build_test_client();
439
440 let mut stream = client.crates_stream(CratesQuery {
441 per_page: 10,
442 ..Default::default()
443 });
444
445 for _ in 0..40 {
446 let _krate = stream.next().await.unwrap().unwrap();
447 eprintln!("CRATE {}", _krate.name);
448 }
449 }
450
451 #[tokio::test]
452 async fn test_full_crate_async() -> Result<(), Error> {
453 let client = build_test_client();
454 client.full_crate("crates_io_api", false).await?;
455
456 Ok(())
457 }
458
459 #[tokio::test]
460 async fn test_user_get_async() -> Result<(), Error> {
461 let client = build_test_client();
462 let user = client.user("theduke").await?;
463 assert_eq!(user.login, "theduke");
464 Ok(())
465 }
466
467 #[tokio::test]
468 async fn test_crates_filter_by_user_async() -> Result<(), Error> {
469 let client = build_test_client();
470
471 let user = client.user("theduke").await?;
472
473 let res = client
474 .crates(CratesQuery {
475 user_id: Some(user.id),
476 per_page: 20,
477 ..Default::default()
478 })
479 .await?;
480
481 assert!(!res.crates.is_empty());
482 for krate in res.crates {
484 let owners = client.crate_owners(&krate.name).await?;
485 assert!(owners.iter().any(|o| o.id == user.id));
486 }
487
488 Ok(())
489 }
490
491 #[tokio::test]
492 async fn test_crates_filter_by_category_async() -> Result<(), Error> {
493 let client = build_test_client();
494
495 let category = "wasm".to_string();
496
497 let res = client
498 .crates(CratesQuery {
499 category: Some(category.clone()),
500 per_page: 3,
501 ..Default::default()
502 })
503 .await?;
504
505 assert!(!res.crates.is_empty());
506 for list_crate in res.crates {
508 let krate = client.get_crate(&list_crate.name).await?;
509 assert!(krate.categories.iter().any(|c| c.id == category));
510 }
511
512 Ok(())
513 }
514
515 #[tokio::test]
516 async fn test_crates_filter_by_ids_async() -> Result<(), Error> {
517 let client = build_test_client();
518
519 let ids = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]
520 .map(Into::into)
521 .to_vec();
522 let res = client
523 .crates(CratesQuery {
524 ids: Some(ids),
525 per_page: 10,
526 ..Default::default()
527 })
528 .await?;
529
530 assert_eq!(
531 res.crates.len(),
532 10,
533 "Expected 10 crates, actually got {}. Crates: {:#?}",
534 res.crates.len(),
535 res.crates
536 );
537 Ok(())
538 }
539
540 #[tokio::test]
541 async fn test_crate_reverse_dependency_count_async() -> Result<(), Error> {
542 let client = build_test_client();
543 let count = client
544 .crate_reverse_dependency_count("crates_io_api")
545 .await?;
546 assert!(count > 0);
547
548 Ok(())
549 }
550
551 #[tokio::test]
553 async fn test_get_crate_with_slash() {
554 let client = build_test_client();
555 match client.get_crate("a/b").await {
556 Err(Error::NotFound(_)) => {}
557 other => {
558 panic!("Invalid response: expected NotFound error, got {:?}", other);
559 }
560 }
561 }
562}