use crate::error::Error;
use crate::proxy_config::ProxyConfig;
use log::{debug, info, warn};
use std::sync::{LazyLock, Mutex};
use std::time::{Duration, Instant};
use tokio::runtime::Runtime;
const FIGMA_TOKEN_HEADER: &str = "X-Figma-Token";
const REQUEST_TIMEOUT_SECS: u64 = 90;
static RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.thread_name("dc-http-pool")
.enable_all()
.build()
.expect("Failed to create HTTP client tokio runtime")
});
pub struct RateLimiter {
tokens: f64,
max_tokens: f64,
refill_rate: f64,
last_refill: Instant,
}
impl RateLimiter {
pub fn new(max_tokens: f64, refill_rate: f64) -> Self {
RateLimiter {
tokens: max_tokens, max_tokens,
refill_rate,
last_refill: Instant::now(),
}
}
fn refill(&mut self) {
let now = Instant::now();
let elapsed = now.duration_since(self.last_refill).as_secs_f64();
self.tokens = (self.tokens + elapsed * self.refill_rate).min(self.max_tokens);
self.last_refill = now;
}
pub fn acquire(&mut self) -> Option<Duration> {
self.refill();
if self.tokens >= 1.0 {
self.tokens -= 1.0;
None } else {
let deficit = 1.0 - self.tokens;
if self.refill_rate <= 0.0 {
return Some(Duration::from_secs(1));
}
let wait_secs = deficit / self.refill_rate;
Some(Duration::from_secs_f64(wait_secs))
}
}
pub fn acquire_blocking(&mut self) {
if let Some(wait) = self.acquire() {
warn!(
"Rate limiter: waiting {:.0}ms for token (bucket: {:.1}/{:.0})",
wait.as_millis(),
self.tokens,
self.max_tokens
);
std::thread::sleep(wait);
self.refill();
self.tokens = (self.tokens - 1.0).max(0.0);
}
}
pub fn available_tokens(&self) -> f64 {
self.tokens
}
}
static RATE_LIMITER: LazyLock<Mutex<RateLimiter>> =
LazyLock::new(|| Mutex::new(RateLimiter::new(30.0, 10.0)));
fn acquire_rate_token() {
if let Ok(mut limiter) = RATE_LIMITER.lock() {
debug!(
"Rate limiter: acquiring token (available: {:.1}/{:.0})",
limiter.available_tokens(),
limiter.max_tokens
);
limiter.acquire_blocking();
}
}
fn build_async_client(proxy_config: &ProxyConfig) -> Result<reqwest::Client, Error> {
let mut builder = reqwest::Client::builder().timeout(Duration::from_secs(REQUEST_TIMEOUT_SECS));
if let ProxyConfig::HttpProxyConfig(spec) = proxy_config {
builder = builder.proxy(reqwest::Proxy::all(spec)?);
}
builder.build().map_err(Error::from)
}
pub fn http_fetch(api_key: &str, url: String, proxy_config: &ProxyConfig) -> Result<String, Error> {
acquire_rate_token();
RUNTIME.block_on(async_http_fetch(api_key, url, proxy_config))
}
async fn async_http_fetch(
api_key: &str,
url: String,
proxy_config: &ProxyConfig,
) -> Result<String, Error> {
let client = build_async_client(proxy_config)?;
let body = client
.get(url.as_str())
.header(FIGMA_TOKEN_HEADER, api_key)
.send()
.await?
.error_for_status()?
.text()
.await?;
Ok(body)
}
pub fn http_fetch_bytes(url: String, proxy_config: &ProxyConfig) -> Result<Vec<u8>, Error> {
acquire_rate_token();
RUNTIME.block_on(async_http_fetch_bytes(url, proxy_config))
}
async fn async_http_fetch_bytes(url: String, proxy_config: &ProxyConfig) -> Result<Vec<u8>, Error> {
let client = build_async_client(proxy_config)?;
let bytes = client
.get(url.as_str())
.timeout(Duration::from_secs(REQUEST_TIMEOUT_SECS))
.send()
.await?
.error_for_status()?
.bytes()
.await?;
Ok(bytes.to_vec())
}
#[derive(Clone, Debug)]
pub struct BatchRequest {
pub id: String,
pub url: String,
}
#[derive(Debug)]
pub struct BatchResponse {
pub id: String,
pub result: Result<String, Error>,
}
pub fn http_fetch_batch(
api_key: &str,
requests: Vec<BatchRequest>,
proxy_config: &ProxyConfig,
) -> Vec<BatchResponse> {
if requests.is_empty() {
return vec![];
}
for _ in 0..requests.len() {
acquire_rate_token();
}
info!("http_fetch_batch: fetching {} URLs concurrently", requests.len());
RUNTIME.block_on(async {
let client = match build_async_client(proxy_config) {
Ok(c) => c,
Err(e) => {
return requests
.into_iter()
.map(|r| BatchResponse {
id: r.id,
result: Err(Error::HttpClientError(format!(
"Failed to build HTTP client: {}",
e
))),
})
.collect();
}
};
let futures: Vec<_> = requests
.into_iter()
.map(|req| {
let client = client.clone();
let api_key = api_key.to_string();
async move {
let result = client
.get(req.url.as_str())
.header(FIGMA_TOKEN_HEADER, &api_key)
.send()
.await
.and_then(|r| r.error_for_status());
let result = match result {
Ok(response) => response.text().await.map_err(Error::from),
Err(e) => Err(Error::from(e)),
};
BatchResponse { id: req.id, result }
}
})
.collect();
futures::future::join_all(futures).await
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_runtime_creates_successfully() {
let _runtime = &*RUNTIME;
}
#[test]
fn test_empty_batch_returns_empty() {
let results = http_fetch_batch("fake_key", vec![], &ProxyConfig::None);
assert!(results.is_empty());
}
#[test]
fn test_batch_request_preserves_ids() {
let requests = vec![
BatchRequest {
id: "req1".to_string(),
url: "http://localhost:1/nonexistent1".to_string(),
},
BatchRequest {
id: "req2".to_string(),
url: "http://localhost:1/nonexistent2".to_string(),
},
];
let results = http_fetch_batch("fake_key", requests, &ProxyConfig::None);
assert_eq!(results.len(), 2);
let ids: std::collections::HashSet<_> = results.iter().map(|r| r.id.clone()).collect();
assert!(ids.contains("req1"));
assert!(ids.contains("req2"));
assert!(results.iter().all(|r| r.result.is_err()));
}
#[test]
fn test_rate_limiter_immediate_acquire() {
let mut limiter = RateLimiter::new(10.0, 5.0);
for _ in 0..10 {
assert!(limiter.acquire().is_none(), "Should acquire immediately");
}
assert!(limiter.acquire().is_some(), "Should require wait after exhausting burst");
}
#[test]
fn test_rate_limiter_refill() {
let mut limiter = RateLimiter::new(5.0, 100.0); for _ in 0..5 {
limiter.acquire();
}
assert!(limiter.available_tokens() < 1.0);
std::thread::sleep(Duration::from_millis(50));
limiter.refill();
assert!(
limiter.available_tokens() >= 3.0,
"Should have refilled: got {}",
limiter.available_tokens()
);
}
#[test]
fn test_rate_limiter_max_cap() {
let mut limiter = RateLimiter::new(5.0, 100.0);
std::thread::sleep(Duration::from_millis(200));
limiter.refill();
assert!(
limiter.available_tokens() <= 5.0,
"Should not exceed max: got {}",
limiter.available_tokens()
);
}
#[test]
fn test_rate_limiter_wait_duration_calculation() {
let mut limiter = RateLimiter::new(2.0, 10.0); assert!(limiter.acquire().is_none());
assert!(limiter.acquire().is_none());
let wait = limiter.acquire();
assert!(wait.is_some());
let wait_ms = wait.unwrap().as_millis();
assert!(
wait_ms <= 150, "Wait should be ~100ms, got {}ms",
wait_ms
);
}
#[test]
fn test_rate_limiter_blocking_acquire() {
let mut limiter = RateLimiter::new(1.0, 100.0); limiter.acquire_blocking();
let before = Instant::now();
limiter.acquire_blocking();
let elapsed = before.elapsed();
assert!(
elapsed.as_millis() >= 5 && elapsed.as_millis() <= 50,
"Blocking acquire should take ~10ms, took {}ms",
elapsed.as_millis()
);
}
#[test]
fn test_rate_limiter_concurrent_safety() {
use std::sync::{Arc, Mutex};
let limiter = Arc::new(Mutex::new(RateLimiter::new(10.0, 100.0)));
let mut handles = vec![];
for _ in 0..5 {
let lim = Arc::clone(&limiter);
handles.push(std::thread::spawn(move || {
for _ in 0..2 {
let mut l = lim.lock().unwrap();
l.acquire_blocking();
}
}));
}
for h in handles {
h.join().unwrap();
}
let l = limiter.lock().unwrap();
assert!(
l.available_tokens() < 2.0,
"Tokens should be mostly consumed: got {}",
l.available_tokens()
);
}
#[test]
fn test_rate_limiter_zero_refill_rate() {
let mut limiter = RateLimiter::new(3.0, 0.0);
assert!(limiter.acquire().is_none());
assert!(limiter.acquire().is_none());
assert!(limiter.acquire().is_none());
let wait = limiter.acquire();
assert!(wait.is_some());
assert_eq!(wait.unwrap().as_secs(), 1, "Zero refill rate should return 1s fallback");
}
}