Skip to main content

jhol_core/
http_client.rs

1//! Bounded HTTP client: connection reuse via a single Agent, capped concurrency.
2
3use std::fs::File;
4use std::io::{Read, Write};
5use std::path::Path;
6use std::sync::{Condvar, Mutex};
7
8const REQUEST_TIMEOUT_MS: u64 = 30_000;
9const DEFAULT_CONCURRENCY: usize = 16;
10const MAX_CONCURRENCY_CAP: usize = 32;
11const DEFAULT_RETRY_COUNT: usize = 2;
12const DEFAULT_RETRY_BACKOFF_MS: u64 = 250;
13
14fn concurrency_from_env() -> usize {
15    std::env::var("JHOL_NETWORK_CONCURRENCY")
16        .ok()
17        .and_then(|v| v.parse::<usize>().ok())
18        .map(|n| n.clamp(1, MAX_CONCURRENCY_CAP))
19        .unwrap_or_else(|| {
20            std::thread::available_parallelism()
21                .map(|n| (n.get() * 2).clamp(4, MAX_CONCURRENCY_CAP))
22                .unwrap_or(DEFAULT_CONCURRENCY)
23        })
24}
25
26fn retry_count_from_env() -> usize {
27    std::env::var("JHOL_HTTP_RETRIES")
28        .ok()
29        .and_then(|v| v.parse::<usize>().ok())
30        .unwrap_or(DEFAULT_RETRY_COUNT)
31}
32
33fn retry_backoff_ms_from_env() -> u64 {
34    std::env::var("JHOL_HTTP_RETRY_BACKOFF_MS")
35        .ok()
36        .and_then(|v| v.parse::<u64>().ok())
37        .unwrap_or(DEFAULT_RETRY_BACKOFF_MS)
38}
39
40/// Semaphore-style limit: wait until a slot is free, then hold until guard is dropped.
41struct ConcurrencyLimit {
42    mutex: Mutex<usize>,
43    condvar: Condvar,
44    max: usize,
45}
46
47impl ConcurrencyLimit {
48    fn new(max: usize) -> Self {
49        Self {
50            mutex: Mutex::new(0),
51            condvar: Condvar::new(),
52            max,
53        }
54    }
55
56    fn acquire(&self) -> ConcurrencyGuard<'_> {
57        let mut guard = self.mutex.lock().unwrap();
58        while *guard >= self.max {
59            guard = self.condvar.wait(guard).unwrap();
60        }
61        *guard += 1;
62        ConcurrencyGuard(self)
63    }
64}
65
66struct ConcurrencyGuard<'a>(&'a ConcurrencyLimit);
67
68impl Drop for ConcurrencyGuard<'_> {
69    fn drop(&mut self) {
70        let mut guard = self.0.mutex.lock().unwrap();
71        *guard = guard.saturating_sub(1);
72        self.0.condvar.notify_one();
73    }
74}
75
76/// HTTP client: one Agent (connection reuse), bounded concurrent requests.
77pub struct HttpClient {
78    agent: ureq::Agent,
79    limit: ConcurrencyLimit,
80}
81
82impl HttpClient {
83    pub fn new(max_concurrent: usize) -> Self {
84        let agent = ureq::AgentBuilder::new()
85            .timeout(std::time::Duration::from_millis(REQUEST_TIMEOUT_MS))
86            .build();
87        Self {
88            agent,
89            limit: ConcurrencyLimit::new(max_concurrent),
90        }
91    }
92
93    /// GET url and return body bytes.
94    pub fn get(&self, url: &str) -> Result<Vec<u8>, String> {
95        self.get_with_accept(url, None)
96    }
97
98    /// GET url with optional Accept header (e.g. for abbreviated packument).
99    pub fn get_with_accept(&self, url: &str, accept: Option<&str>) -> Result<Vec<u8>, String> {
100        let _guard = self.limit.acquire();
101        let resp = self.send_with_retry(|| {
102            let req = self.agent.get(url);
103            match accept {
104                Some(h) => req.set("Accept", h).call(),
105                None => req.call(),
106            }
107        })?;
108        let mut buf = Vec::new();
109        resp.into_reader()
110            .read_to_end(&mut buf)
111            .map_err(|e| e.to_string())?;
112        Ok(buf)
113    }
114
115    /// POST body to url (e.g. JSON). Content-Type: application/json.
116    pub fn post_json(&self, url: &str, body: &[u8]) -> Result<Vec<u8>, String> {
117        let _guard = self.limit.acquire();
118        let resp = self.send_with_retry(|| {
119            self.agent
120                .post(url)
121                .set("Content-Type", "application/json")
122                .send_bytes(body)
123        })?;
124        let mut buf = Vec::new();
125        resp.into_reader()
126            .read_to_end(&mut buf)
127            .map_err(|e| e.to_string())?;
128        Ok(buf)
129    }
130
131    /// GET url and write body to file (for tarballs).
132    pub fn get_to_file(&self, url: &str, dest: &Path) -> Result<(), String> {
133        let _guard = self.limit.acquire();
134        let resp = self.send_with_retry(|| self.agent.get(url).call())?;
135        let mut out = File::create(dest).map_err(|e| e.to_string())?;
136        let mut reader = resp.into_reader();
137        std::io::copy(&mut reader, &mut out).map_err(|e| e.to_string())?;
138        out.flush().map_err(|e| e.to_string())?;
139        Ok(())
140    }
141
142    fn send_with_retry<F>(&self, mut send: F) -> Result<ureq::Response, String>
143    where
144        F: FnMut() -> Result<ureq::Response, ureq::Error>,
145    {
146        let retries = retry_count_from_env();
147        let mut attempt = 0usize;
148        let mut backoff = retry_backoff_ms_from_env();
149        loop {
150            attempt += 1;
151            match send() {
152                Ok(resp) => {
153                    if resp.status() == 200 {
154                        return Ok(resp);
155                    }
156                    let status = resp.status();
157                    if attempt <= retries && (status >= 500 || status == 429) {
158                        std::thread::sleep(std::time::Duration::from_millis(backoff));
159                        backoff = backoff.saturating_mul(2).min(5_000);
160                        continue;
161                    }
162                    return Err(format!("HTTP {}", status));
163                }
164                Err(ureq::Error::Status(code, _resp)) => {
165                    if attempt <= retries && (code >= 500 || code == 429) {
166                        std::thread::sleep(std::time::Duration::from_millis(backoff));
167                        backoff = backoff.saturating_mul(2).min(5_000);
168                        continue;
169                    }
170                    return Err(format!("HTTP {}", code));
171                }
172                Err(e) => {
173                    if attempt <= retries {
174                        std::thread::sleep(std::time::Duration::from_millis(backoff));
175                        backoff = backoff.saturating_mul(2).min(5_000);
176                        continue;
177                    }
178                    return Err(e.to_string());
179                }
180            }
181        }
182    }
183}
184
185static CLIENT: std::sync::OnceLock<HttpClient> = std::sync::OnceLock::new();
186
187fn get_global() -> &'static HttpClient {
188    CLIENT.get_or_init(|| HttpClient::new(concurrency_from_env()))
189}
190
191/// GET url and return body (uses global bounded client).
192pub fn get(url: &str) -> Result<Vec<u8>, String> {
193    get_global().get(url)
194}
195
196/// GET url with optional Accept header (e.g. application/vnd.npm.install-v1+json for abbreviated packument).
197pub fn get_with_accept(url: &str, accept: Option<&str>) -> Result<Vec<u8>, String> {
198    get_global().get_with_accept(url, accept)
199}
200
201/// GET url and write to file (uses global bounded client).
202pub fn get_to_file(url: &str, dest: &Path) -> Result<(), String> {
203    get_global().get_to_file(url, dest)
204}
205
206/// POST JSON body to url (uses global bounded client).
207pub fn post_json(url: &str, body: &[u8]) -> Result<Vec<u8>, String> {
208    get_global().post_json(url, body)
209}