use governor::{
clock::DefaultClock,
state::{InMemoryState, NotKeyed},
Quota, RateLimiter,
};
use std::num::NonZeroU32;
use std::sync::Arc;
use std::time::Duration;
use crate::datasets::sodir::error::{Result, SodirError};
pub const RATE_LIMIT_PER_SEC: u32 = 5;
const USER_AGENT: &str = "kglite-datasets-sodir/1";
type SharedLimiter = Arc<RateLimiter<NotKeyed, InMemoryState, DefaultClock>>;
#[derive(Clone)]
pub struct ArcGISClient {
http: reqwest::Client,
limiter: SharedLimiter,
retry_count: usize,
}
impl ArcGISClient {
pub fn new() -> Result<Self> {
Self::with_options(RATE_LIMIT_PER_SEC, 3)
}
pub fn with_options(rate_per_sec: u32, retry_count: usize) -> Result<Self> {
let http = reqwest::Client::builder()
.user_agent(USER_AGENT)
.gzip(true)
.connect_timeout(Duration::from_secs(15))
.timeout(Duration::from_secs(60))
.build()?;
let rate = NonZeroU32::new(rate_per_sec)
.ok_or_else(|| SodirError::Decode("rate_per_sec must be > 0".into()))?;
let limiter = Arc::new(RateLimiter::direct(Quota::per_second(rate)));
Ok(ArcGISClient {
http,
limiter,
retry_count,
})
}
async fn fetch_once(&self, url: &str) -> Result<Vec<u8>> {
self.limiter.until_ready().await;
let resp = self.http.get(url).send().await?;
let status = resp.status();
if !status.is_success() {
return Err(SodirError::BadStatus {
status: status.as_u16(),
url: url.to_string(),
});
}
Ok(resp.bytes().await?.to_vec())
}
pub async fn fetch_bytes(&self, url: &str) -> Result<Vec<u8>> {
let mut delay_ms = 500u64;
for attempt in 0..=self.retry_count {
match self.fetch_once(url).await {
Ok(bytes) => return Ok(bytes),
Err(SodirError::BadStatus { status, .. })
if status == 429 || (500..=599).contains(&status) =>
{
if attempt == self.retry_count {
return Err(SodirError::RateLimited {
retries: self.retry_count,
});
}
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
delay_ms = (delay_ms * 2).min(30_000);
}
Err(SodirError::Http(e)) if e.is_timeout() || e.is_connect() || e.is_request() => {
if attempt == self.retry_count {
return Err(SodirError::Http(e));
}
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
delay_ms = (delay_ms * 2).min(30_000);
}
Err(other) => return Err(other),
}
}
unreachable!("loop returns or errors before completing")
}
pub async fn fetch_json(&self, url: &str) -> Result<serde_json::Value> {
let bytes = self.fetch_bytes(url).await?;
serde_json::from_slice(&bytes)
.map_err(|e| SodirError::Decode(format!("json from {url}: {e}")))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn client_constructs() {
assert!(ArcGISClient::new().is_ok());
}
#[test]
fn zero_rate_rejected() {
assert!(ArcGISClient::with_options(0, 3).is_err());
}
#[test]
fn client_is_send_sync_clone() {
fn assert_bounds<T: Send + Sync + Clone>() {}
assert_bounds::<ArcGISClient>();
}
}