use std::{
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
thread,
time::Duration,
};
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use crate::{
types::{ComponentType, StoreEntry},
{Error, Result},
};
use super::config::{ApiConfig, CONNECT_TIMEOUT, DEFAULT_API_CONFIG, REQUEST_TIMEOUT, USER_AGENT};
use super::ocs_parser::Meta;
use super::ocs_parser::{build_category_string, parse_ocs_response};
#[derive(Clone)]
pub(crate) struct ApiClient {
client: reqwest::blocking::Client,
config: &'static ApiConfig,
request_count: Arc<AtomicUsize>,
}
impl Default for ApiClient {
fn default() -> Self {
Self::new()
}
}
impl ApiClient {
pub fn new() -> Self {
Self::with_config(&DEFAULT_API_CONFIG)
.unwrap_or_else(|e| panic!("failed to create API client: {e}"))
}
pub(super) fn with_config(config: &'static ApiConfig) -> Result<Self> {
let client = reqwest::blocking::Client::builder()
.connect_timeout(CONNECT_TIMEOUT)
.timeout(REQUEST_TIMEOUT)
.user_agent(USER_AGENT)
.build()?;
Ok(Self {
client,
config,
request_count: Arc::new(AtomicUsize::new(0)),
})
}
pub fn http_client(&self) -> &reqwest::blocking::Client {
&self.client
}
#[cfg(feature = "debug")]
pub fn request_count(&self) -> usize {
self.request_count.load(Ordering::Relaxed)
}
pub(crate) fn request_counter(&self) -> Arc<AtomicUsize> {
Arc::clone(&self.request_count)
}
pub fn fetch_all(&self, categories: &[ComponentType]) -> Result<Vec<StoreEntry>> {
let category_str = build_category_string(categories);
let base_url = self.config.base_url;
let page_size = self.config.page_size;
let first_url = format!(
"{base_url}/content/data?categories={category_str}&page=0&pagesize={page_size}&sort=new"
);
let (first_entries, meta) = self.fetch_page(&first_url)?;
let total_items = meta.total_items;
if total_items <= u32::from(page_size) {
return Ok(first_entries);
}
let total_pages = total_items.div_ceil(u32::from(page_size));
if total_pages > 10 {
log::warn!(
target: "api",
"store returned {total_items} items across {total_pages} pages; fetch may be slow"
);
}
let remaining_pages: Vec<u32> = (1..total_pages).collect();
let results: Vec<Result<(Vec<StoreEntry>, _)>> = remaining_pages
.par_iter()
.map(|&page| {
let url = format!(
"{base_url}/content/data?categories={category_str}&page={page}&pagesize={page_size}&sort=new"
);
self.fetch_page(&url)
})
.collect();
let mut all_entries = first_entries;
let mut error_count = 0usize;
for result in results {
match result {
Ok((entries, _)) => all_entries.extend(entries),
Err(_) => error_count += 1,
}
}
if error_count > 0 {
log::warn!(target: "api", "{error_count} page{} failed to fetch", if error_count == 1 { "" } else { "s" });
}
Ok(all_entries)
}
pub fn fetch_details(&self, content_ids: &[u64]) -> Vec<Result<StoreEntry>> {
content_ids
.par_iter()
.map(|&id| {
let base_url = self.config.base_url;
let url = format!("{base_url}/content/data/{id}");
let (entries, _) = self.fetch_page(&url)?;
entries
.into_iter()
.next()
.ok_or_else(|| Error::ComponentNotFound(format!("store content id {id}")))
})
.collect()
}
fn fetch_page(&self, url: &str) -> Result<(Vec<StoreEntry>, Meta)> {
let mut backoff_ms = self.config.initial_backoff_ms;
for attempt in 0..self.config.max_retries {
self.request_count.fetch_add(1, Ordering::Relaxed);
let r = self.client.get(url).send()?;
let retry_after_secs = parse_retry_after(&r);
if r.status() == reqwest::StatusCode::TOO_MANY_REQUESTS {
return match retry_after_secs {
Some(secs) => self.send_after(url, secs),
None => Err(Error::RateLimited),
};
}
let xml = r.text()?;
match parse_ocs_response(&xml) {
Ok(result) => return Ok(result),
Err(Error::RateLimited) if retry_after_secs.is_some() => {
return self.send_after(url, retry_after_secs.unwrap());
}
Err(ref e)
if !matches!(e, Error::ApiError(_))
&& attempt + 1 < self.config.max_retries =>
{
thread::sleep(Duration::from_millis(backoff_ms.into()));
backoff_ms = backoff_ms.saturating_mul(2);
}
Err(e) => return Err(e),
}
}
Err(Error::other("max retries exceeded"))
}
fn send_after(&self, url: &str, secs: u64) -> Result<(Vec<StoreEntry>, Meta)> {
log::info!(target: "api", "rate limited, retrying after {secs}s");
thread::sleep(Duration::from_secs(secs));
self.request_count.fetch_add(1, Ordering::Relaxed);
let r = self.client.get(url).send()?;
if r.status() == reqwest::StatusCode::TOO_MANY_REQUESTS {
return Err(Error::RateLimited);
}
let xml = r.text()?;
parse_ocs_response(&xml)
}
}
fn parse_retry_after(response: &reqwest::blocking::Response) -> Option<u64> {
response
.headers()
.get(reqwest::header::RETRY_AFTER)?
.to_str()
.ok()?
.parse()
.ok()
}