use std::collections::{HashMap, VecDeque};
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Method { Get, Post, Put, Delete, Patch, Head, Options }
impl Method {
pub fn as_str(self) -> &'static str {
match self {
Self::Get => "GET",
Self::Post => "POST",
Self::Put => "PUT",
Self::Delete => "DELETE",
Self::Patch => "PATCH",
Self::Head => "HEAD",
Self::Options => "OPTIONS",
}
}
}
#[derive(Debug, Clone)]
pub struct HttpRequest {
pub id: RequestId,
pub method: Method,
pub url: String,
pub headers: HashMap<String, String>,
pub body: Option<Vec<u8>>,
pub timeout: Duration,
pub max_retries: u32,
pub priority: i32,
pub tag: Option<String>,
pub cache_policy: CachePolicy,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum CachePolicy {
NoStore,
UseCache,
Revalidate,
NoCache,
}
impl HttpRequest {
pub fn get(url: impl Into<String>) -> Self {
Self::new(Method::Get, url)
}
pub fn post(url: impl Into<String>, body: Vec<u8>) -> Self {
let mut r = Self::new(Method::Post, url);
r.body = Some(body);
r
}
pub fn post_json(url: impl Into<String>, json: impl Into<String>) -> Self {
let mut r = Self::new(Method::Post, url);
r.body = Some(json.into().into_bytes());
r.headers.insert("Content-Type".into(), "application/json".into());
r
}
pub fn new(method: Method, url: impl Into<String>) -> Self {
Self {
id: RequestId::next(),
method,
url: url.into(),
headers: HashMap::new(),
body: None,
timeout: Duration::from_secs(10),
max_retries: 3,
priority: 0,
tag: None,
cache_policy: CachePolicy::UseCache,
}
}
pub fn with_header(mut self, key: impl Into<String>, val: impl Into<String>) -> Self {
self.headers.insert(key.into(), val.into());
self
}
pub fn with_timeout(mut self, t: Duration) -> Self { self.timeout = t; self }
pub fn with_retries(mut self, n: u32) -> Self { self.max_retries = n; self }
pub fn with_priority(mut self, p: i32) -> Self { self.priority = p; self }
pub fn with_tag(mut self, t: impl Into<String>) -> Self { self.tag = Some(t.into()); self }
pub fn with_cache(mut self, p: CachePolicy) -> Self { self.cache_policy = p; self }
pub fn bearer_auth(self, token: impl Into<String>) -> Self {
self.with_header("Authorization", format!("Bearer {}", token.into()))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct RequestId(pub u64);
impl RequestId {
pub fn next() -> Self {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(1);
Self(COUNTER.fetch_add(1, Ordering::Relaxed))
}
}
#[derive(Debug, Clone)]
pub struct HttpResponse {
pub status: u16,
pub headers: HashMap<String, String>,
pub body: Vec<u8>,
pub text: Option<String>,
pub latency: Duration,
pub from_cache: bool,
}
impl HttpResponse {
pub fn is_success(&self) -> bool { (200..300).contains(&self.status) }
pub fn is_client_error(&self) -> bool { (400..500).contains(&self.status) }
pub fn is_server_error(&self) -> bool { (500..600).contains(&self.status) }
pub fn is_not_modified(&self) -> bool { self.status == 304 }
pub fn content_type(&self) -> Option<&str> {
self.headers.get("content-type").map(|s| s.as_str())
}
pub fn etag(&self) -> Option<&str> {
self.headers.get("etag").map(|s| s.as_str())
}
pub fn last_modified(&self) -> Option<&str> {
self.headers.get("last-modified").map(|s| s.as_str())
}
pub fn text_body(&self) -> &str {
self.text.as_deref().unwrap_or("")
}
pub fn json_field(&self, key: &str) -> Option<String> {
let text = self.text.as_ref()?;
let search = format!("\"{}\":", key);
let pos = text.find(&search)?;
let after = text[pos + search.len()..].trim_start();
if after.starts_with('"') {
let end = after[1..].find('"')?;
Some(after[1..end+1].to_owned())
} else {
let end = after.find([',', '}', '\n']).unwrap_or(after.len());
Some(after[..end].trim().to_owned())
}
}
}
#[derive(Debug, Clone)]
pub enum HttpEvent {
Success { id: RequestId, response: HttpResponse },
Failure { id: RequestId, error: HttpError, url: String },
Timeout { id: RequestId, url: String },
Cancelled { id: RequestId },
RateLimited { id: RequestId, delay_ms: u64 },
}
#[derive(Debug, Clone)]
pub enum HttpError {
ConnectionFailed(String),
DnsFailure(String),
TlsError(String),
ServerError(u16, String),
ReadError(String),
InvalidRequest(String),
RetriesExhausted { attempts: u32 },
}
impl std::fmt::Display for HttpError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ConnectionFailed(s) => write!(f, "Connection failed: {}", s),
Self::DnsFailure(s) => write!(f, "DNS failure: {}", s),
Self::TlsError(s) => write!(f, "TLS error: {}", s),
Self::ServerError(c, s) => write!(f, "HTTP {}: {}", c, s),
Self::ReadError(s) => write!(f, "Read error: {}", s),
Self::InvalidRequest(s) => write!(f, "Invalid request: {}", s),
Self::RetriesExhausted { attempts } => write!(f, "Failed after {} attempts", attempts),
}
}
}
#[derive(Debug, Clone)]
struct CacheEntry {
response: HttpResponse,
etag: Option<String>,
last_modified: Option<String>,
stored_at: Instant,
ttl: Duration,
}
impl CacheEntry {
fn is_fresh(&self) -> bool {
self.stored_at.elapsed() < self.ttl
}
}
#[derive(Debug, Clone)]
pub struct RateLimiter {
pub limit: u32,
pub window: f32,
tokens: f32,
last_refill: Option<Instant>,
}
impl RateLimiter {
pub fn new(limit: u32, window_secs: f32) -> Self {
Self { limit, window: window_secs, tokens: limit as f32, last_refill: None }
}
pub fn try_consume(&mut self) -> bool {
self.refill();
if self.tokens >= 1.0 {
self.tokens -= 1.0;
true
} else {
false
}
}
fn refill(&mut self) {
let now = Instant::now();
if let Some(last) = self.last_refill {
let elapsed = last.elapsed().as_secs_f32();
let rate = self.limit as f32 / self.window.max(1e-3);
self.tokens = (self.tokens + rate * elapsed).min(self.limit as f32);
}
self.last_refill = Some(now);
}
pub fn wait_time(&self) -> f32 {
if self.tokens >= 1.0 { return 0.0; }
let rate = self.limit as f32 / self.window.max(1e-3);
(1.0 - self.tokens) / rate.max(1e-6)
}
}
#[derive(Debug)]
struct InFlightRequest {
request: HttpRequest,
attempt: u32,
started: Instant,
retry_after: Option<Instant>,
state: RequestState,
}
#[derive(Debug)]
enum RequestState {
Pending,
Sent { sent_at: Instant },
Receiving { status: u16, headers: HashMap<String, String>, body: Vec<u8> },
Done,
}
pub struct HttpClient {
queue: Vec<InFlightRequest>,
in_flight: Vec<InFlightRequest>,
cache: HashMap<String, CacheEntry>,
rate_limiters: HashMap<String, RateLimiter>,
events: VecDeque<HttpEvent>,
pub cache_ttl: Duration,
pub max_concurrent: usize,
pub verbose: bool,
pub default_headers: HashMap<String, String>,
}
impl HttpClient {
pub fn new() -> Self {
Self {
queue: Vec::new(),
in_flight: Vec::new(),
cache: HashMap::new(),
rate_limiters: HashMap::new(),
events: VecDeque::new(),
cache_ttl: Duration::from_secs(60),
max_concurrent: 6,
verbose: false,
default_headers: HashMap::new(),
}
}
pub fn send(&mut self, mut request: HttpRequest) -> RequestId {
let id = request.id;
for (k, v) in &self.default_headers {
request.headers.entry(k.clone()).or_insert_with(|| v.clone());
}
if request.cache_policy != CachePolicy::NoCache
&& request.cache_policy != CachePolicy::NoStore
&& request.method == Method::Get
{
if let Some(entry) = self.cache.get(&request.url) {
if entry.is_fresh() || request.cache_policy == CachePolicy::UseCache {
let mut resp = entry.response.clone();
resp.from_cache = true;
self.events.push_back(HttpEvent::Success { id, response: resp });
return id;
}
if let Some(ref etag) = entry.etag.clone() {
request.headers.insert("If-None-Match".into(), etag.clone());
}
if let Some(ref lm) = entry.last_modified.clone() {
request.headers.insert("If-Modified-Since".into(), lm.clone());
}
}
}
self.queue.push(InFlightRequest {
request,
attempt: 0,
started: Instant::now(),
retry_after: None,
state: RequestState::Pending,
});
id
}
pub fn cancel_by_tag(&mut self, tag: &str) {
let cancelled: Vec<RequestId> = self.queue.iter()
.chain(self.in_flight.iter())
.filter(|r| r.request.tag.as_deref() == Some(tag))
.map(|r| r.request.id)
.collect();
for id in cancelled {
self.events.push_back(HttpEvent::Cancelled { id });
}
self.queue.retain(|r| r.request.tag.as_deref() != Some(tag));
self.in_flight.retain(|r| r.request.tag.as_deref() != Some(tag));
}
pub fn set_rate_limit(&mut self, base_url: &str, limit: u32, window_secs: f32) {
self.rate_limiters.insert(base_url.to_owned(), RateLimiter::new(limit, window_secs));
}
pub fn set_default_header(&mut self, key: impl Into<String>, val: impl Into<String>) {
self.default_headers.insert(key.into(), val.into());
}
pub fn tick(&mut self, dt: f32) {
self.queue.sort_by_key(|r| -r.request.priority);
while self.in_flight.len() < self.max_concurrent && !self.queue.is_empty() {
let mut req = self.queue.remove(0);
let base = base_url(&req.request.url);
if let Some(limiter) = self.rate_limiters.get_mut(&base) {
if !limiter.try_consume() {
let wait_ms = (limiter.wait_time() * 1000.0) as u64;
self.events.push_back(HttpEvent::RateLimited {
id: req.request.id,
delay_ms: wait_ms,
});
req.retry_after = Some(Instant::now() + Duration::from_millis(wait_ms));
self.queue.push(req);
continue;
}
}
req.state = RequestState::Sent { sent_at: Instant::now() };
self.in_flight.push(req);
}
let mut completed = Vec::new();
for (i, req) in self.in_flight.iter_mut().enumerate() {
if req.started.elapsed() > req.request.timeout {
completed.push((i, None::<HttpResponse>, true));
continue;
}
if let RequestState::Sent { sent_at } = req.state {
let simulated_latency = Duration::from_millis(50);
if sent_at.elapsed() >= simulated_latency {
let response = HttpResponse {
status: 200,
headers: HashMap::new(),
body: Vec::new(),
text: Some(String::new()),
latency: sent_at.elapsed(),
from_cache: false,
};
completed.push((i, Some(response), false));
}
}
}
for (i, resp, timed_out) in completed.into_iter().rev() {
let req = self.in_flight.remove(i);
let id = req.request.id;
if timed_out {
if req.attempt < req.request.max_retries {
let backoff = backoff_duration(req.attempt);
self.queue.push(InFlightRequest {
attempt: req.attempt + 1,
started: Instant::now(),
retry_after: Some(Instant::now() + backoff),
state: RequestState::Pending,
..req
});
} else {
self.events.push_back(HttpEvent::Timeout {
id,
url: req.request.url.clone(),
});
}
continue;
}
if let Some(response) = resp {
if response.is_success() && req.request.method == Method::Get
&& req.request.cache_policy != CachePolicy::NoStore
{
self.cache.insert(req.request.url.clone(), CacheEntry {
etag: response.etag().map(|s| s.to_owned()),
last_modified: response.last_modified().map(|s| s.to_owned()),
stored_at: Instant::now(),
ttl: self.cache_ttl,
response: response.clone(),
});
}
self.events.push_back(HttpEvent::Success { id, response });
}
}
}
pub fn drain_events(&mut self) -> impl Iterator<Item = HttpEvent> + '_ {
self.events.drain(..)
}
pub fn pending_count(&self) -> usize {
self.queue.len() + self.in_flight.len()
}
pub fn clear_cache(&mut self) { self.cache.clear(); }
pub fn evict_stale_cache(&mut self) {
self.cache.retain(|_, entry| entry.is_fresh());
}
}
impl Default for HttpClient {
fn default() -> Self { Self::new() }
}
fn base_url(url: &str) -> String {
if let Some(after_scheme) = url.strip_prefix("https://").or_else(|| url.strip_prefix("http://")) {
let host_end = after_scheme.find('/').unwrap_or(after_scheme.len());
let scheme = if url.starts_with("https") { "https" } else { "http" };
format!("{}://{}", scheme, &after_scheme[..host_end])
} else {
url.to_owned()
}
}
fn backoff_duration(attempt: u32) -> Duration {
let base_ms = 200u64 * (1u64 << attempt.min(7));
let jitter = simple_hash(attempt as u64) % (base_ms / 4).max(1);
Duration::from_millis((base_ms + jitter).min(30_000))
}
fn simple_hash(n: u64) -> u64 {
let mut x = n ^ (n >> 33);
x = x.wrapping_mul(0xff51afd7ed558ccd);
x ^= x >> 33;
x = x.wrapping_mul(0xc4ceb9fe1a85ec53);
x ^= x >> 33;
x
}