1use std::collections::{HashMap, VecDeque};
17use std::time::{Duration, Instant};
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum Method { Get, Post, Put, Delete, Patch, Head, Options }
23
24impl Method {
25 pub fn as_str(self) -> &'static str {
26 match self {
27 Self::Get => "GET",
28 Self::Post => "POST",
29 Self::Put => "PUT",
30 Self::Delete => "DELETE",
31 Self::Patch => "PATCH",
32 Self::Head => "HEAD",
33 Self::Options => "OPTIONS",
34 }
35 }
36}
37
38#[derive(Debug, Clone)]
42pub struct HttpRequest {
43 pub id: RequestId,
44 pub method: Method,
45 pub url: String,
46 pub headers: HashMap<String, String>,
47 pub body: Option<Vec<u8>>,
48 pub timeout: Duration,
49 pub max_retries: u32,
50 pub priority: i32,
52 pub tag: Option<String>,
54 pub cache_policy: CachePolicy,
56}
57
58#[derive(Debug, Clone, Copy, PartialEq)]
59pub enum CachePolicy {
60 NoStore,
62 UseCache,
64 Revalidate,
66 NoCache,
68}
69
70impl HttpRequest {
71 pub fn get(url: impl Into<String>) -> Self {
72 Self::new(Method::Get, url)
73 }
74
75 pub fn post(url: impl Into<String>, body: Vec<u8>) -> Self {
76 let mut r = Self::new(Method::Post, url);
77 r.body = Some(body);
78 r
79 }
80
81 pub fn post_json(url: impl Into<String>, json: impl Into<String>) -> Self {
82 let mut r = Self::new(Method::Post, url);
83 r.body = Some(json.into().into_bytes());
84 r.headers.insert("Content-Type".into(), "application/json".into());
85 r
86 }
87
88 pub fn new(method: Method, url: impl Into<String>) -> Self {
89 Self {
90 id: RequestId::next(),
91 method,
92 url: url.into(),
93 headers: HashMap::new(),
94 body: None,
95 timeout: Duration::from_secs(10),
96 max_retries: 3,
97 priority: 0,
98 tag: None,
99 cache_policy: CachePolicy::UseCache,
100 }
101 }
102
103 pub fn with_header(mut self, key: impl Into<String>, val: impl Into<String>) -> Self {
104 self.headers.insert(key.into(), val.into());
105 self
106 }
107
108 pub fn with_timeout(mut self, t: Duration) -> Self { self.timeout = t; self }
109 pub fn with_retries(mut self, n: u32) -> Self { self.max_retries = n; self }
110 pub fn with_priority(mut self, p: i32) -> Self { self.priority = p; self }
111 pub fn with_tag(mut self, t: impl Into<String>) -> Self { self.tag = Some(t.into()); self }
112 pub fn with_cache(mut self, p: CachePolicy) -> Self { self.cache_policy = p; self }
113
114 pub fn bearer_auth(self, token: impl Into<String>) -> Self {
115 self.with_header("Authorization", format!("Bearer {}", token.into()))
116 }
117}
118
119#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
122pub struct RequestId(pub u64);
123
124impl RequestId {
125 pub fn next() -> Self {
126 use std::sync::atomic::{AtomicU64, Ordering};
127 static COUNTER: AtomicU64 = AtomicU64::new(1);
128 Self(COUNTER.fetch_add(1, Ordering::Relaxed))
129 }
130}
131
132#[derive(Debug, Clone)]
135pub struct HttpResponse {
136 pub status: u16,
137 pub headers: HashMap<String, String>,
138 pub body: Vec<u8>,
139 pub text: Option<String>,
141 pub latency: Duration,
142 pub from_cache: bool,
143}
144
145impl HttpResponse {
146 pub fn is_success(&self) -> bool { (200..300).contains(&self.status) }
147 pub fn is_client_error(&self) -> bool { (400..500).contains(&self.status) }
148 pub fn is_server_error(&self) -> bool { (500..600).contains(&self.status) }
149 pub fn is_not_modified(&self) -> bool { self.status == 304 }
150
151 pub fn content_type(&self) -> Option<&str> {
152 self.headers.get("content-type").map(|s| s.as_str())
153 }
154
155 pub fn etag(&self) -> Option<&str> {
156 self.headers.get("etag").map(|s| s.as_str())
157 }
158
159 pub fn last_modified(&self) -> Option<&str> {
160 self.headers.get("last-modified").map(|s| s.as_str())
161 }
162
163 pub fn text_body(&self) -> &str {
164 self.text.as_deref().unwrap_or("")
165 }
166
167 pub fn json_field(&self, key: &str) -> Option<String> {
169 let text = self.text.as_ref()?;
170 let search = format!("\"{}\":", key);
172 let pos = text.find(&search)?;
173 let after = text[pos + search.len()..].trim_start();
174 if after.starts_with('"') {
175 let end = after[1..].find('"')?;
176 Some(after[1..end+1].to_owned())
177 } else {
178 let end = after.find([',', '}', '\n']).unwrap_or(after.len());
179 Some(after[..end].trim().to_owned())
180 }
181 }
182}
183
184#[derive(Debug, Clone)]
187pub enum HttpEvent {
188 Success { id: RequestId, response: HttpResponse },
190 Failure { id: RequestId, error: HttpError, url: String },
192 Timeout { id: RequestId, url: String },
194 Cancelled { id: RequestId },
196 RateLimited { id: RequestId, delay_ms: u64 },
198}
199
200#[derive(Debug, Clone)]
201pub enum HttpError {
202 ConnectionFailed(String),
204 DnsFailure(String),
206 TlsError(String),
208 ServerError(u16, String),
210 ReadError(String),
212 InvalidRequest(String),
214 RetriesExhausted { attempts: u32 },
216}
217
218impl std::fmt::Display for HttpError {
219 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
220 match self {
221 Self::ConnectionFailed(s) => write!(f, "Connection failed: {}", s),
222 Self::DnsFailure(s) => write!(f, "DNS failure: {}", s),
223 Self::TlsError(s) => write!(f, "TLS error: {}", s),
224 Self::ServerError(c, s) => write!(f, "HTTP {}: {}", c, s),
225 Self::ReadError(s) => write!(f, "Read error: {}", s),
226 Self::InvalidRequest(s) => write!(f, "Invalid request: {}", s),
227 Self::RetriesExhausted { attempts } => write!(f, "Failed after {} attempts", attempts),
228 }
229 }
230}
231
232#[derive(Debug, Clone)]
235struct CacheEntry {
236 response: HttpResponse,
237 etag: Option<String>,
238 last_modified: Option<String>,
239 stored_at: Instant,
240 ttl: Duration,
241}
242
243impl CacheEntry {
244 fn is_fresh(&self) -> bool {
245 self.stored_at.elapsed() < self.ttl
246 }
247}
248
249#[derive(Debug, Clone)]
253pub struct RateLimiter {
254 pub limit: u32,
256 pub window: f32,
258 tokens: f32,
260 last_refill: Option<Instant>,
261}
262
263impl RateLimiter {
264 pub fn new(limit: u32, window_secs: f32) -> Self {
265 Self { limit, window: window_secs, tokens: limit as f32, last_refill: None }
266 }
267
268 pub fn try_consume(&mut self) -> bool {
269 self.refill();
270 if self.tokens >= 1.0 {
271 self.tokens -= 1.0;
272 true
273 } else {
274 false
275 }
276 }
277
278 fn refill(&mut self) {
279 let now = Instant::now();
280 if let Some(last) = self.last_refill {
281 let elapsed = last.elapsed().as_secs_f32();
282 let rate = self.limit as f32 / self.window.max(1e-3);
283 self.tokens = (self.tokens + rate * elapsed).min(self.limit as f32);
284 }
285 self.last_refill = Some(now);
286 }
287
288 pub fn wait_time(&self) -> f32 {
290 if self.tokens >= 1.0 { return 0.0; }
291 let rate = self.limit as f32 / self.window.max(1e-3);
292 (1.0 - self.tokens) / rate.max(1e-6)
293 }
294}
295
296#[derive(Debug)]
299struct InFlightRequest {
300 request: HttpRequest,
301 attempt: u32,
302 started: Instant,
303 retry_after: Option<Instant>,
304 state: RequestState,
306}
307
308#[derive(Debug)]
309enum RequestState {
310 Pending,
312 Sent { sent_at: Instant },
314 Receiving { status: u16, headers: HashMap<String, String>, body: Vec<u8> },
316 Done,
318}
319
320pub struct HttpClient {
329 queue: Vec<InFlightRequest>,
331 in_flight: Vec<InFlightRequest>,
333 cache: HashMap<String, CacheEntry>,
335 rate_limiters: HashMap<String, RateLimiter>,
337 events: VecDeque<HttpEvent>,
339 pub cache_ttl: Duration,
341 pub max_concurrent: usize,
343 pub verbose: bool,
345 pub default_headers: HashMap<String, String>,
347}
348
349impl HttpClient {
350 pub fn new() -> Self {
351 Self {
352 queue: Vec::new(),
353 in_flight: Vec::new(),
354 cache: HashMap::new(),
355 rate_limiters: HashMap::new(),
356 events: VecDeque::new(),
357 cache_ttl: Duration::from_secs(60),
358 max_concurrent: 6,
359 verbose: false,
360 default_headers: HashMap::new(),
361 }
362 }
363
364 pub fn send(&mut self, mut request: HttpRequest) -> RequestId {
366 let id = request.id;
367
368 for (k, v) in &self.default_headers {
370 request.headers.entry(k.clone()).or_insert_with(|| v.clone());
371 }
372
373 if request.cache_policy != CachePolicy::NoCache
375 && request.cache_policy != CachePolicy::NoStore
376 && request.method == Method::Get
377 {
378 if let Some(entry) = self.cache.get(&request.url) {
379 if entry.is_fresh() || request.cache_policy == CachePolicy::UseCache {
380 let mut resp = entry.response.clone();
381 resp.from_cache = true;
382 self.events.push_back(HttpEvent::Success { id, response: resp });
383 return id;
384 }
385 if let Some(ref etag) = entry.etag.clone() {
387 request.headers.insert("If-None-Match".into(), etag.clone());
388 }
389 if let Some(ref lm) = entry.last_modified.clone() {
390 request.headers.insert("If-Modified-Since".into(), lm.clone());
391 }
392 }
393 }
394
395 self.queue.push(InFlightRequest {
396 request,
397 attempt: 0,
398 started: Instant::now(),
399 retry_after: None,
400 state: RequestState::Pending,
401 });
402
403 id
404 }
405
406 pub fn cancel_by_tag(&mut self, tag: &str) {
408 let cancelled: Vec<RequestId> = self.queue.iter()
409 .chain(self.in_flight.iter())
410 .filter(|r| r.request.tag.as_deref() == Some(tag))
411 .map(|r| r.request.id)
412 .collect();
413 for id in cancelled {
414 self.events.push_back(HttpEvent::Cancelled { id });
415 }
416 self.queue.retain(|r| r.request.tag.as_deref() != Some(tag));
417 self.in_flight.retain(|r| r.request.tag.as_deref() != Some(tag));
418 }
419
420 pub fn set_rate_limit(&mut self, base_url: &str, limit: u32, window_secs: f32) {
422 self.rate_limiters.insert(base_url.to_owned(), RateLimiter::new(limit, window_secs));
423 }
424
425 pub fn set_default_header(&mut self, key: impl Into<String>, val: impl Into<String>) {
427 self.default_headers.insert(key.into(), val.into());
428 }
429
430 pub fn tick(&mut self, dt: f32) {
432 self.queue.sort_by_key(|r| -r.request.priority);
434
435 while self.in_flight.len() < self.max_concurrent && !self.queue.is_empty() {
437 let mut req = self.queue.remove(0);
438
439 let base = base_url(&req.request.url);
441 if let Some(limiter) = self.rate_limiters.get_mut(&base) {
442 if !limiter.try_consume() {
443 let wait_ms = (limiter.wait_time() * 1000.0) as u64;
444 self.events.push_back(HttpEvent::RateLimited {
445 id: req.request.id,
446 delay_ms: wait_ms,
447 });
448 req.retry_after = Some(Instant::now() + Duration::from_millis(wait_ms));
449 self.queue.push(req);
450 continue;
451 }
452 }
453
454 req.state = RequestState::Sent { sent_at: Instant::now() };
455 self.in_flight.push(req);
456 }
457
458 let mut completed = Vec::new();
460 for (i, req) in self.in_flight.iter_mut().enumerate() {
461 if req.started.elapsed() > req.request.timeout {
463 completed.push((i, None::<HttpResponse>, true));
464 continue;
465 }
466
467 if let RequestState::Sent { sent_at } = req.state {
469 let simulated_latency = Duration::from_millis(50);
470 if sent_at.elapsed() >= simulated_latency {
471 let response = HttpResponse {
473 status: 200,
474 headers: HashMap::new(),
475 body: Vec::new(),
476 text: Some(String::new()),
477 latency: sent_at.elapsed(),
478 from_cache: false,
479 };
480 completed.push((i, Some(response), false));
481 }
482 }
483 }
484
485 for (i, resp, timed_out) in completed.into_iter().rev() {
487 let req = self.in_flight.remove(i);
488 let id = req.request.id;
489
490 if timed_out {
491 if req.attempt < req.request.max_retries {
493 let backoff = backoff_duration(req.attempt);
494 self.queue.push(InFlightRequest {
495 attempt: req.attempt + 1,
496 started: Instant::now(),
497 retry_after: Some(Instant::now() + backoff),
498 state: RequestState::Pending,
499 ..req
500 });
501 } else {
502 self.events.push_back(HttpEvent::Timeout {
503 id,
504 url: req.request.url.clone(),
505 });
506 }
507 continue;
508 }
509
510 if let Some(response) = resp {
511 if response.is_success() && req.request.method == Method::Get
513 && req.request.cache_policy != CachePolicy::NoStore
514 {
515 self.cache.insert(req.request.url.clone(), CacheEntry {
516 etag: response.etag().map(|s| s.to_owned()),
517 last_modified: response.last_modified().map(|s| s.to_owned()),
518 stored_at: Instant::now(),
519 ttl: self.cache_ttl,
520 response: response.clone(),
521 });
522 }
523 self.events.push_back(HttpEvent::Success { id, response });
524 }
525 }
526 }
527
528 pub fn drain_events(&mut self) -> impl Iterator<Item = HttpEvent> + '_ {
530 self.events.drain(..)
531 }
532
533 pub fn pending_count(&self) -> usize {
535 self.queue.len() + self.in_flight.len()
536 }
537
538 pub fn clear_cache(&mut self) { self.cache.clear(); }
540
541 pub fn evict_stale_cache(&mut self) {
543 self.cache.retain(|_, entry| entry.is_fresh());
544 }
545}
546
547impl Default for HttpClient {
548 fn default() -> Self { Self::new() }
549}
550
551fn base_url(url: &str) -> String {
554 if let Some(after_scheme) = url.strip_prefix("https://").or_else(|| url.strip_prefix("http://")) {
556 let host_end = after_scheme.find('/').unwrap_or(after_scheme.len());
557 let scheme = if url.starts_with("https") { "https" } else { "http" };
558 format!("{}://{}", scheme, &after_scheme[..host_end])
559 } else {
560 url.to_owned()
561 }
562}
563
564fn backoff_duration(attempt: u32) -> Duration {
565 let base_ms = 200u64 * (1u64 << attempt.min(7));
567 let jitter = simple_hash(attempt as u64) % (base_ms / 4).max(1);
569 Duration::from_millis((base_ms + jitter).min(30_000))
570}
571
572fn simple_hash(n: u64) -> u64 {
573 let mut x = n ^ (n >> 33);
574 x = x.wrapping_mul(0xff51afd7ed558ccd);
575 x ^= x >> 33;
576 x = x.wrapping_mul(0xc4ceb9fe1a85ec53);
577 x ^= x >> 33;
578 x
579}