batata_consul_client/
client.rs

1use std::sync::Arc;
2use std::time::{Duration, Instant};
3
4use reqwest::{header, Client, RequestBuilder, Response};
5use tracing::{debug, error};
6
7use crate::config::Config;
8use crate::error::{ConsulError, Result};
9use crate::types::{QueryMeta, QueryOptions, WriteMeta, WriteOptions};
10
11/// HTTP client for Consul API
12pub struct HttpClient {
13    config: Arc<Config>,
14    client: Client,
15}
16
17impl HttpClient {
18    /// Create a new HTTP client with the given configuration
19    pub fn new(config: Config) -> Result<Self> {
20        config.validate()?;
21
22        let mut builder = Client::builder()
23            .timeout(config.timeout)
24            .pool_max_idle_per_host(10);
25
26        // Configure TLS
27        if let Some(ref tls) = config.tls_config {
28            if tls.insecure_skip_verify {
29                builder = builder.danger_accept_invalid_certs(true);
30            }
31
32            // TODO: Add CA cert and client cert configuration
33        }
34
35        let client = builder.build().map_err(ConsulError::HttpError)?;
36
37        Ok(Self {
38            config: Arc::new(config),
39            client,
40        })
41    }
42
43    /// Get the configuration
44    pub fn config(&self) -> &Config {
45        &self.config
46    }
47
48    /// Build the full URL for an API path
49    pub fn url(&self, path: &str) -> String {
50        format!("{}{}", self.config.base_url(), path)
51    }
52
53    /// Create a GET request builder
54    pub fn get(&self, path: &str) -> RequestBuilder {
55        let url = self.url(path);
56        debug!("GET {}", url);
57        self.apply_defaults(self.client.get(&url))
58    }
59
60    /// Create a PUT request builder
61    pub fn put(&self, path: &str) -> RequestBuilder {
62        let url = self.url(path);
63        debug!("PUT {}", url);
64        self.apply_defaults(self.client.put(&url))
65    }
66
67    /// Create a POST request builder
68    pub fn post(&self, path: &str) -> RequestBuilder {
69        let url = self.url(path);
70        debug!("POST {}", url);
71        self.apply_defaults(self.client.post(&url))
72    }
73
74    /// Create a DELETE request builder
75    pub fn delete(&self, path: &str) -> RequestBuilder {
76        let url = self.url(path);
77        debug!("DELETE {}", url);
78        self.apply_defaults(self.client.delete(&url))
79    }
80
81    /// Apply default headers and authentication
82    fn apply_defaults(&self, mut builder: RequestBuilder) -> RequestBuilder {
83        // Add token if configured
84        if let Some(ref token) = self.config.token {
85            builder = builder.header("X-Consul-Token", token);
86        }
87
88        // Add HTTP basic auth if configured
89        if let Some(ref auth) = self.config.http_auth {
90            builder = builder.basic_auth(&auth.username, Some(&auth.password));
91        }
92
93        builder
94    }
95
96    /// Apply query options to a request
97    pub fn apply_query_options(&self, mut builder: RequestBuilder, opts: &QueryOptions) -> RequestBuilder {
98        let mut params: Vec<(&str, String)> = Vec::new();
99
100        if let Some(ref dc) = opts.datacenter {
101            params.push(("dc", dc.clone()));
102        } else if let Some(ref dc) = self.config.datacenter {
103            params.push(("dc", dc.clone()));
104        }
105
106        if let Some(ref token) = opts.token {
107            builder = builder.header("X-Consul-Token", token);
108        }
109
110        if let Some(ref ns) = opts.namespace {
111            params.push(("ns", ns.clone()));
112        } else if let Some(ref ns) = self.config.namespace {
113            params.push(("ns", ns.clone()));
114        }
115
116        if let Some(ref partition) = opts.partition {
117            params.push(("partition", partition.clone()));
118        } else if let Some(ref partition) = self.config.partition {
119            params.push(("partition", partition.clone()));
120        }
121
122        if opts.allow_stale {
123            params.push(("stale", String::new()));
124        }
125
126        if opts.require_consistent {
127            params.push(("consistent", String::new()));
128        }
129
130        if opts.wait_index > 0 {
131            params.push(("index", opts.wait_index.to_string()));
132        }
133
134        if let Some(wait_time) = opts.wait_time {
135            params.push(("wait", format!("{}s", wait_time.as_secs())));
136        }
137
138        if let Some(ref near) = opts.near {
139            params.push(("near", near.clone()));
140        }
141
142        for (key, value) in &opts.node_meta {
143            params.push(("node-meta", format!("{}:{}", key, value)));
144        }
145
146        if let Some(ref filter) = opts.filter {
147            params.push(("filter", filter.clone()));
148        }
149
150        if opts.use_cache {
151            builder = builder.header(header::CACHE_CONTROL, "");
152            if let Some(max_age) = opts.max_age {
153                builder = builder.header(header::CACHE_CONTROL, format!("max-age={}", max_age.as_secs()));
154            }
155            if let Some(stale_if_error) = opts.stale_if_error {
156                builder = builder.header(
157                    header::CACHE_CONTROL,
158                    format!("stale-if-error={}", stale_if_error.as_secs()),
159                );
160            }
161        }
162
163        if !params.is_empty() {
164            builder = builder.query(&params);
165        }
166
167        builder
168    }
169
170    /// Apply write options to a request
171    pub fn apply_write_options(&self, mut builder: RequestBuilder, opts: &WriteOptions) -> RequestBuilder {
172        let mut params: Vec<(&str, String)> = Vec::new();
173
174        if let Some(ref dc) = opts.datacenter {
175            params.push(("dc", dc.clone()));
176        } else if let Some(ref dc) = self.config.datacenter {
177            params.push(("dc", dc.clone()));
178        }
179
180        if let Some(ref token) = opts.token {
181            builder = builder.header("X-Consul-Token", token);
182        }
183
184        if let Some(ref ns) = opts.namespace {
185            params.push(("ns", ns.clone()));
186        } else if let Some(ref ns) = self.config.namespace {
187            params.push(("ns", ns.clone()));
188        }
189
190        if let Some(ref partition) = opts.partition {
191            params.push(("partition", partition.clone()));
192        } else if let Some(ref partition) = self.config.partition {
193            params.push(("partition", partition.clone()));
194        }
195
196        if !params.is_empty() {
197            builder = builder.query(&params);
198        }
199
200        builder
201    }
202
203    /// Execute a request and return the response
204    pub async fn execute(&self, builder: RequestBuilder) -> Result<Response> {
205        let response = builder.send().await.map_err(ConsulError::HttpError)?;
206        Ok(response)
207    }
208
209    /// Execute a request and parse the response as JSON
210    pub async fn execute_json<T: serde::de::DeserializeOwned>(
211        &self,
212        builder: RequestBuilder,
213    ) -> Result<T> {
214        let response = self.execute(builder).await?;
215        self.handle_response_json(response).await
216    }
217
218    /// Execute a query request and return response with metadata
219    pub async fn query<T: serde::de::DeserializeOwned>(
220        &self,
221        builder: RequestBuilder,
222    ) -> Result<(T, QueryMeta)> {
223        let start = Instant::now();
224        let response = self.execute(builder).await?;
225        let meta = self.parse_query_meta(&response, start.elapsed());
226        let data = self.handle_response_json(response).await?;
227        Ok((data, meta))
228    }
229
230    /// Execute a write request and return response with metadata
231    pub async fn write<T: serde::de::DeserializeOwned>(
232        &self,
233        builder: RequestBuilder,
234    ) -> Result<(T, WriteMeta)> {
235        let start = Instant::now();
236        let response = self.execute(builder).await?;
237        let meta = WriteMeta {
238            request_time: start.elapsed(),
239        };
240        let data = self.handle_response_json(response).await?;
241        Ok((data, meta))
242    }
243
244    /// Execute a write request that returns a boolean
245    pub async fn write_bool(&self, builder: RequestBuilder) -> Result<(bool, WriteMeta)> {
246        let start = Instant::now();
247        let response = self.execute(builder).await?;
248        let meta = WriteMeta {
249            request_time: start.elapsed(),
250        };
251        let status = response.status();
252
253        if status.is_success() {
254            let text = response.text().await.map_err(ConsulError::HttpError)?;
255            let result = text.trim() == "true";
256            Ok((result, meta))
257        } else {
258            let text = response.text().await.unwrap_or_default();
259            Err(ConsulError::api_error(status.as_u16(), text))
260        }
261    }
262
263    /// Execute a write request that returns no content
264    pub async fn write_empty(&self, builder: RequestBuilder) -> Result<WriteMeta> {
265        let start = Instant::now();
266        let response = self.execute(builder).await?;
267        let status = response.status();
268
269        if status.is_success() {
270            Ok(WriteMeta {
271                request_time: start.elapsed(),
272            })
273        } else {
274            let text = response.text().await.unwrap_or_default();
275            Err(ConsulError::api_error(status.as_u16(), text))
276        }
277    }
278
279    /// Handle response and parse JSON
280    async fn handle_response_json<T: serde::de::DeserializeOwned>(
281        &self,
282        response: Response,
283    ) -> Result<T> {
284        let status = response.status();
285
286        if status.is_success() {
287            response.json::<T>().await.map_err(ConsulError::HttpError)
288        } else {
289            let text = response.text().await.unwrap_or_default();
290            error!("API error: {} - {}", status, text);
291            Err(ConsulError::api_error(status.as_u16(), text))
292        }
293    }
294
295    /// Parse query metadata from response headers
296    fn parse_query_meta(&self, response: &Response, request_time: Duration) -> QueryMeta {
297        let mut meta = QueryMeta {
298            request_time,
299            ..Default::default()
300        };
301
302        if let Some(index) = response.headers().get("X-Consul-Index") {
303            if let Ok(s) = index.to_str() {
304                meta.last_index = s.parse().unwrap_or(0);
305            }
306        }
307
308        if let Some(leader) = response.headers().get("X-Consul-KnownLeader") {
309            if let Ok(s) = leader.to_str() {
310                meta.known_leader = s == "true";
311            }
312        }
313
314        if let Some(contact) = response.headers().get("X-Consul-LastContact") {
315            if let Ok(s) = contact.to_str() {
316                if let Ok(ms) = s.parse::<u64>() {
317                    meta.last_contact = Duration::from_millis(ms);
318                }
319            }
320        }
321
322        if let Some(translate) = response.headers().get("X-Consul-Translate-Addresses") {
323            if let Ok(s) = translate.to_str() {
324                meta.address_translation_enabled = s == "true";
325            }
326        }
327
328        if let Some(hit) = response.headers().get("X-Cache") {
329            if let Ok(s) = hit.to_str() {
330                meta.cache_hit = s == "HIT";
331            }
332        }
333
334        if let Some(age) = response.headers().get("Age") {
335            if let Ok(s) = age.to_str() {
336                if let Ok(secs) = s.parse::<u64>() {
337                    meta.cache_age = Some(Duration::from_secs(secs));
338                }
339            }
340        }
341
342        meta
343    }
344
345    /// Check if server is reachable
346    pub async fn ping(&self) -> Result<()> {
347        let response = self.get("/v1/status/leader").send().await.map_err(ConsulError::HttpError)?;
348
349        if response.status().is_success() {
350            Ok(())
351        } else {
352            Err(ConsulError::api_error(
353                response.status().as_u16(),
354                "failed to connect to Consul",
355            ))
356        }
357    }
358}
359
360impl Clone for HttpClient {
361    fn clone(&self) -> Self {
362        Self {
363            config: self.config.clone(),
364            client: self.client.clone(),
365        }
366    }
367}