1use std::fs::File;
4use std::io::{Read, Write};
5use std::path::Path;
6use std::sync::{Condvar, Mutex};
7
8const REQUEST_TIMEOUT_MS: u64 = 30_000;
9const DEFAULT_CONCURRENCY: usize = 16;
10const MAX_CONCURRENCY_CAP: usize = 32;
11const DEFAULT_RETRY_COUNT: usize = 2;
12const DEFAULT_RETRY_BACKOFF_MS: u64 = 250;
13
14fn concurrency_from_env() -> usize {
15 std::env::var("JHOL_NETWORK_CONCURRENCY")
16 .ok()
17 .and_then(|v| v.parse::<usize>().ok())
18 .map(|n| n.clamp(1, MAX_CONCURRENCY_CAP))
19 .unwrap_or_else(|| {
20 std::thread::available_parallelism()
21 .map(|n| (n.get() * 2).clamp(4, MAX_CONCURRENCY_CAP))
22 .unwrap_or(DEFAULT_CONCURRENCY)
23 })
24}
25
26fn retry_count_from_env() -> usize {
27 std::env::var("JHOL_HTTP_RETRIES")
28 .ok()
29 .and_then(|v| v.parse::<usize>().ok())
30 .unwrap_or(DEFAULT_RETRY_COUNT)
31}
32
33fn retry_backoff_ms_from_env() -> u64 {
34 std::env::var("JHOL_HTTP_RETRY_BACKOFF_MS")
35 .ok()
36 .and_then(|v| v.parse::<u64>().ok())
37 .unwrap_or(DEFAULT_RETRY_BACKOFF_MS)
38}
39
40struct ConcurrencyLimit {
42 mutex: Mutex<usize>,
43 condvar: Condvar,
44 max: usize,
45}
46
47impl ConcurrencyLimit {
48 fn new(max: usize) -> Self {
49 Self {
50 mutex: Mutex::new(0),
51 condvar: Condvar::new(),
52 max,
53 }
54 }
55
56 fn acquire(&self) -> ConcurrencyGuard<'_> {
57 let mut guard = self.mutex.lock().unwrap();
58 while *guard >= self.max {
59 guard = self.condvar.wait(guard).unwrap();
60 }
61 *guard += 1;
62 ConcurrencyGuard(self)
63 }
64}
65
66struct ConcurrencyGuard<'a>(&'a ConcurrencyLimit);
67
68impl Drop for ConcurrencyGuard<'_> {
69 fn drop(&mut self) {
70 let mut guard = self.0.mutex.lock().unwrap();
71 *guard = guard.saturating_sub(1);
72 self.0.condvar.notify_one();
73 }
74}
75
76pub struct HttpClient {
78 agent: ureq::Agent,
79 limit: ConcurrencyLimit,
80}
81
82impl HttpClient {
83 pub fn new(max_concurrent: usize) -> Self {
84 let agent = ureq::AgentBuilder::new()
85 .timeout(std::time::Duration::from_millis(REQUEST_TIMEOUT_MS))
86 .build();
87 Self {
88 agent,
89 limit: ConcurrencyLimit::new(max_concurrent),
90 }
91 }
92
93 pub fn get(&self, url: &str) -> Result<Vec<u8>, String> {
95 self.get_with_accept(url, None)
96 }
97
98 pub fn get_with_accept(&self, url: &str, accept: Option<&str>) -> Result<Vec<u8>, String> {
100 let _guard = self.limit.acquire();
101 let resp = self.send_with_retry(|| {
102 let req = self.agent.get(url);
103 match accept {
104 Some(h) => req.set("Accept", h).call(),
105 None => req.call(),
106 }
107 })?;
108 let mut buf = Vec::new();
109 resp.into_reader()
110 .read_to_end(&mut buf)
111 .map_err(|e| e.to_string())?;
112 Ok(buf)
113 }
114
115 pub fn post_json(&self, url: &str, body: &[u8]) -> Result<Vec<u8>, String> {
117 let _guard = self.limit.acquire();
118 let resp = self.send_with_retry(|| {
119 self.agent
120 .post(url)
121 .set("Content-Type", "application/json")
122 .send_bytes(body)
123 })?;
124 let mut buf = Vec::new();
125 resp.into_reader()
126 .read_to_end(&mut buf)
127 .map_err(|e| e.to_string())?;
128 Ok(buf)
129 }
130
131 pub fn get_to_file(&self, url: &str, dest: &Path) -> Result<(), String> {
133 let _guard = self.limit.acquire();
134 let resp = self.send_with_retry(|| self.agent.get(url).call())?;
135 let mut out = File::create(dest).map_err(|e| e.to_string())?;
136 let mut reader = resp.into_reader();
137 std::io::copy(&mut reader, &mut out).map_err(|e| e.to_string())?;
138 out.flush().map_err(|e| e.to_string())?;
139 Ok(())
140 }
141
142 fn send_with_retry<F>(&self, mut send: F) -> Result<ureq::Response, String>
143 where
144 F: FnMut() -> Result<ureq::Response, ureq::Error>,
145 {
146 let retries = retry_count_from_env();
147 let mut attempt = 0usize;
148 let mut backoff = retry_backoff_ms_from_env();
149 loop {
150 attempt += 1;
151 match send() {
152 Ok(resp) => {
153 if resp.status() == 200 {
154 return Ok(resp);
155 }
156 let status = resp.status();
157 if attempt <= retries && (status >= 500 || status == 429) {
158 std::thread::sleep(std::time::Duration::from_millis(backoff));
159 backoff = backoff.saturating_mul(2).min(5_000);
160 continue;
161 }
162 return Err(format!("HTTP {}", status));
163 }
164 Err(ureq::Error::Status(code, _resp)) => {
165 if attempt <= retries && (code >= 500 || code == 429) {
166 std::thread::sleep(std::time::Duration::from_millis(backoff));
167 backoff = backoff.saturating_mul(2).min(5_000);
168 continue;
169 }
170 return Err(format!("HTTP {}", code));
171 }
172 Err(e) => {
173 if attempt <= retries {
174 std::thread::sleep(std::time::Duration::from_millis(backoff));
175 backoff = backoff.saturating_mul(2).min(5_000);
176 continue;
177 }
178 return Err(e.to_string());
179 }
180 }
181 }
182 }
183}
184
185static CLIENT: std::sync::OnceLock<HttpClient> = std::sync::OnceLock::new();
186
187fn get_global() -> &'static HttpClient {
188 CLIENT.get_or_init(|| HttpClient::new(concurrency_from_env()))
189}
190
191pub fn get(url: &str) -> Result<Vec<u8>, String> {
193 get_global().get(url)
194}
195
196pub fn get_with_accept(url: &str, accept: Option<&str>) -> Result<Vec<u8>, String> {
198 get_global().get_with_accept(url, accept)
199}
200
201pub fn get_to_file(url: &str, dest: &Path) -> Result<(), String> {
203 get_global().get_to_file(url, dest)
204}
205
206pub fn post_json(url: &str, body: &[u8]) -> Result<Vec<u8>, String> {
208 get_global().post_json(url, body)
209}