1use std::sync::Arc;
2use std::time::{Duration, Instant};
3
4use reqwest::{header, Client, RequestBuilder, Response};
5use tracing::{debug, error};
6
7use crate::config::Config;
8use crate::error::{ConsulError, Result};
9use crate::types::{QueryMeta, QueryOptions, WriteMeta, WriteOptions};
10
11pub struct HttpClient {
13 config: Arc<Config>,
14 client: Client,
15}
16
17impl HttpClient {
18 pub fn new(config: Config) -> Result<Self> {
20 config.validate()?;
21
22 let mut builder = Client::builder()
23 .timeout(config.timeout)
24 .pool_max_idle_per_host(10);
25
26 if let Some(ref tls) = config.tls_config {
28 if tls.insecure_skip_verify {
29 builder = builder.danger_accept_invalid_certs(true);
30 }
31
32 }
34
35 let client = builder.build().map_err(ConsulError::HttpError)?;
36
37 Ok(Self {
38 config: Arc::new(config),
39 client,
40 })
41 }
42
43 pub fn config(&self) -> &Config {
45 &self.config
46 }
47
48 pub fn url(&self, path: &str) -> String {
50 format!("{}{}", self.config.base_url(), path)
51 }
52
53 pub fn get(&self, path: &str) -> RequestBuilder {
55 let url = self.url(path);
56 debug!("GET {}", url);
57 self.apply_defaults(self.client.get(&url))
58 }
59
60 pub fn put(&self, path: &str) -> RequestBuilder {
62 let url = self.url(path);
63 debug!("PUT {}", url);
64 self.apply_defaults(self.client.put(&url))
65 }
66
67 pub fn post(&self, path: &str) -> RequestBuilder {
69 let url = self.url(path);
70 debug!("POST {}", url);
71 self.apply_defaults(self.client.post(&url))
72 }
73
74 pub fn delete(&self, path: &str) -> RequestBuilder {
76 let url = self.url(path);
77 debug!("DELETE {}", url);
78 self.apply_defaults(self.client.delete(&url))
79 }
80
81 fn apply_defaults(&self, mut builder: RequestBuilder) -> RequestBuilder {
83 if let Some(ref token) = self.config.token {
85 builder = builder.header("X-Consul-Token", token);
86 }
87
88 if let Some(ref auth) = self.config.http_auth {
90 builder = builder.basic_auth(&auth.username, Some(&auth.password));
91 }
92
93 builder
94 }
95
96 pub fn apply_query_options(&self, mut builder: RequestBuilder, opts: &QueryOptions) -> RequestBuilder {
98 let mut params: Vec<(&str, String)> = Vec::new();
99
100 if let Some(ref dc) = opts.datacenter {
101 params.push(("dc", dc.clone()));
102 } else if let Some(ref dc) = self.config.datacenter {
103 params.push(("dc", dc.clone()));
104 }
105
106 if let Some(ref token) = opts.token {
107 builder = builder.header("X-Consul-Token", token);
108 }
109
110 if let Some(ref ns) = opts.namespace {
111 params.push(("ns", ns.clone()));
112 } else if let Some(ref ns) = self.config.namespace {
113 params.push(("ns", ns.clone()));
114 }
115
116 if let Some(ref partition) = opts.partition {
117 params.push(("partition", partition.clone()));
118 } else if let Some(ref partition) = self.config.partition {
119 params.push(("partition", partition.clone()));
120 }
121
122 if opts.allow_stale {
123 params.push(("stale", String::new()));
124 }
125
126 if opts.require_consistent {
127 params.push(("consistent", String::new()));
128 }
129
130 if opts.wait_index > 0 {
131 params.push(("index", opts.wait_index.to_string()));
132 }
133
134 if let Some(wait_time) = opts.wait_time {
135 params.push(("wait", format!("{}s", wait_time.as_secs())));
136 }
137
138 if let Some(ref near) = opts.near {
139 params.push(("near", near.clone()));
140 }
141
142 for (key, value) in &opts.node_meta {
143 params.push(("node-meta", format!("{}:{}", key, value)));
144 }
145
146 if let Some(ref filter) = opts.filter {
147 params.push(("filter", filter.clone()));
148 }
149
150 if opts.use_cache {
151 builder = builder.header(header::CACHE_CONTROL, "");
152 if let Some(max_age) = opts.max_age {
153 builder = builder.header(header::CACHE_CONTROL, format!("max-age={}", max_age.as_secs()));
154 }
155 if let Some(stale_if_error) = opts.stale_if_error {
156 builder = builder.header(
157 header::CACHE_CONTROL,
158 format!("stale-if-error={}", stale_if_error.as_secs()),
159 );
160 }
161 }
162
163 if !params.is_empty() {
164 builder = builder.query(¶ms);
165 }
166
167 builder
168 }
169
170 pub fn apply_write_options(&self, mut builder: RequestBuilder, opts: &WriteOptions) -> RequestBuilder {
172 let mut params: Vec<(&str, String)> = Vec::new();
173
174 if let Some(ref dc) = opts.datacenter {
175 params.push(("dc", dc.clone()));
176 } else if let Some(ref dc) = self.config.datacenter {
177 params.push(("dc", dc.clone()));
178 }
179
180 if let Some(ref token) = opts.token {
181 builder = builder.header("X-Consul-Token", token);
182 }
183
184 if let Some(ref ns) = opts.namespace {
185 params.push(("ns", ns.clone()));
186 } else if let Some(ref ns) = self.config.namespace {
187 params.push(("ns", ns.clone()));
188 }
189
190 if let Some(ref partition) = opts.partition {
191 params.push(("partition", partition.clone()));
192 } else if let Some(ref partition) = self.config.partition {
193 params.push(("partition", partition.clone()));
194 }
195
196 if !params.is_empty() {
197 builder = builder.query(¶ms);
198 }
199
200 builder
201 }
202
203 pub async fn execute(&self, builder: RequestBuilder) -> Result<Response> {
205 let response = builder.send().await.map_err(ConsulError::HttpError)?;
206 Ok(response)
207 }
208
209 pub async fn execute_json<T: serde::de::DeserializeOwned>(
211 &self,
212 builder: RequestBuilder,
213 ) -> Result<T> {
214 let response = self.execute(builder).await?;
215 self.handle_response_json(response).await
216 }
217
218 pub async fn query<T: serde::de::DeserializeOwned>(
220 &self,
221 builder: RequestBuilder,
222 ) -> Result<(T, QueryMeta)> {
223 let start = Instant::now();
224 let response = self.execute(builder).await?;
225 let meta = self.parse_query_meta(&response, start.elapsed());
226 let data = self.handle_response_json(response).await?;
227 Ok((data, meta))
228 }
229
230 pub async fn write<T: serde::de::DeserializeOwned>(
232 &self,
233 builder: RequestBuilder,
234 ) -> Result<(T, WriteMeta)> {
235 let start = Instant::now();
236 let response = self.execute(builder).await?;
237 let meta = WriteMeta {
238 request_time: start.elapsed(),
239 };
240 let data = self.handle_response_json(response).await?;
241 Ok((data, meta))
242 }
243
244 pub async fn write_bool(&self, builder: RequestBuilder) -> Result<(bool, WriteMeta)> {
246 let start = Instant::now();
247 let response = self.execute(builder).await?;
248 let meta = WriteMeta {
249 request_time: start.elapsed(),
250 };
251 let status = response.status();
252
253 if status.is_success() {
254 let text = response.text().await.map_err(ConsulError::HttpError)?;
255 let result = text.trim() == "true";
256 Ok((result, meta))
257 } else {
258 let text = response.text().await.unwrap_or_default();
259 Err(ConsulError::api_error(status.as_u16(), text))
260 }
261 }
262
263 pub async fn write_empty(&self, builder: RequestBuilder) -> Result<WriteMeta> {
265 let start = Instant::now();
266 let response = self.execute(builder).await?;
267 let status = response.status();
268
269 if status.is_success() {
270 Ok(WriteMeta {
271 request_time: start.elapsed(),
272 })
273 } else {
274 let text = response.text().await.unwrap_or_default();
275 Err(ConsulError::api_error(status.as_u16(), text))
276 }
277 }
278
279 async fn handle_response_json<T: serde::de::DeserializeOwned>(
281 &self,
282 response: Response,
283 ) -> Result<T> {
284 let status = response.status();
285
286 if status.is_success() {
287 response.json::<T>().await.map_err(ConsulError::HttpError)
288 } else {
289 let text = response.text().await.unwrap_or_default();
290 error!("API error: {} - {}", status, text);
291 Err(ConsulError::api_error(status.as_u16(), text))
292 }
293 }
294
295 fn parse_query_meta(&self, response: &Response, request_time: Duration) -> QueryMeta {
297 let mut meta = QueryMeta {
298 request_time,
299 ..Default::default()
300 };
301
302 if let Some(index) = response.headers().get("X-Consul-Index") {
303 if let Ok(s) = index.to_str() {
304 meta.last_index = s.parse().unwrap_or(0);
305 }
306 }
307
308 if let Some(leader) = response.headers().get("X-Consul-KnownLeader") {
309 if let Ok(s) = leader.to_str() {
310 meta.known_leader = s == "true";
311 }
312 }
313
314 if let Some(contact) = response.headers().get("X-Consul-LastContact") {
315 if let Ok(s) = contact.to_str() {
316 if let Ok(ms) = s.parse::<u64>() {
317 meta.last_contact = Duration::from_millis(ms);
318 }
319 }
320 }
321
322 if let Some(translate) = response.headers().get("X-Consul-Translate-Addresses") {
323 if let Ok(s) = translate.to_str() {
324 meta.address_translation_enabled = s == "true";
325 }
326 }
327
328 if let Some(hit) = response.headers().get("X-Cache") {
329 if let Ok(s) = hit.to_str() {
330 meta.cache_hit = s == "HIT";
331 }
332 }
333
334 if let Some(age) = response.headers().get("Age") {
335 if let Ok(s) = age.to_str() {
336 if let Ok(secs) = s.parse::<u64>() {
337 meta.cache_age = Some(Duration::from_secs(secs));
338 }
339 }
340 }
341
342 meta
343 }
344
345 pub async fn ping(&self) -> Result<()> {
347 let response = self.get("/v1/status/leader").send().await.map_err(ConsulError::HttpError)?;
348
349 if response.status().is_success() {
350 Ok(())
351 } else {
352 Err(ConsulError::api_error(
353 response.status().as_u16(),
354 "failed to connect to Consul",
355 ))
356 }
357 }
358}
359
360impl Clone for HttpClient {
361 fn clone(&self) -> Self {
362 Self {
363 config: self.config.clone(),
364 client: self.client.clone(),
365 }
366 }
367}