batata-consul-client 0.0.2

Rust client for HashiCorp Consul or batata
Documentation
use std::sync::Arc;
use std::time::{Duration, Instant};

use reqwest::{header, Client, RequestBuilder, Response};
use tracing::{debug, error};

use crate::config::Config;
use crate::error::{ConsulError, Result};
use crate::types::{QueryMeta, QueryOptions, WriteMeta, WriteOptions};

/// HTTP client for Consul API
pub struct HttpClient {
    config: Arc<Config>,
    client: Client,
}

impl HttpClient {
    /// Create a new HTTP client with the given configuration
    pub fn new(config: Config) -> Result<Self> {
        config.validate()?;

        let mut builder = Client::builder()
            .timeout(config.timeout)
            .pool_max_idle_per_host(10);

        // Configure TLS
        if let Some(ref tls) = config.tls_config {
            if tls.insecure_skip_verify {
                builder = builder.danger_accept_invalid_certs(true);
            }

            // TODO: Add CA cert and client cert configuration
        }

        let client = builder.build().map_err(ConsulError::HttpError)?;

        Ok(Self {
            config: Arc::new(config),
            client,
        })
    }

    /// Get the configuration
    pub fn config(&self) -> &Config {
        &self.config
    }

    /// Build the full URL for an API path
    pub fn url(&self, path: &str) -> String {
        format!("{}{}", self.config.base_url(), path)
    }

    /// Create a GET request builder
    pub fn get(&self, path: &str) -> RequestBuilder {
        let url = self.url(path);
        debug!("GET {}", url);
        self.apply_defaults(self.client.get(&url))
    }

    /// Create a PUT request builder
    pub fn put(&self, path: &str) -> RequestBuilder {
        let url = self.url(path);
        debug!("PUT {}", url);
        self.apply_defaults(self.client.put(&url))
    }

    /// Create a POST request builder
    pub fn post(&self, path: &str) -> RequestBuilder {
        let url = self.url(path);
        debug!("POST {}", url);
        self.apply_defaults(self.client.post(&url))
    }

    /// Create a DELETE request builder
    pub fn delete(&self, path: &str) -> RequestBuilder {
        let url = self.url(path);
        debug!("DELETE {}", url);
        self.apply_defaults(self.client.delete(&url))
    }

    /// Apply default headers and authentication
    fn apply_defaults(&self, mut builder: RequestBuilder) -> RequestBuilder {
        // Add token if configured
        if let Some(ref token) = self.config.token {
            builder = builder.header("X-Consul-Token", token);
        }

        // Add HTTP basic auth if configured
        if let Some(ref auth) = self.config.http_auth {
            builder = builder.basic_auth(&auth.username, Some(&auth.password));
        }

        builder
    }

    /// Apply query options to a request
    pub fn apply_query_options(&self, mut builder: RequestBuilder, opts: &QueryOptions) -> RequestBuilder {
        let mut params: Vec<(&str, String)> = Vec::new();

        if let Some(ref dc) = opts.datacenter {
            params.push(("dc", dc.clone()));
        } else if let Some(ref dc) = self.config.datacenter {
            params.push(("dc", dc.clone()));
        }

        if let Some(ref token) = opts.token {
            builder = builder.header("X-Consul-Token", token);
        }

        if let Some(ref ns) = opts.namespace {
            params.push(("ns", ns.clone()));
        } else if let Some(ref ns) = self.config.namespace {
            params.push(("ns", ns.clone()));
        }

        if let Some(ref partition) = opts.partition {
            params.push(("partition", partition.clone()));
        } else if let Some(ref partition) = self.config.partition {
            params.push(("partition", partition.clone()));
        }

        if opts.allow_stale {
            params.push(("stale", String::new()));
        }

        if opts.require_consistent {
            params.push(("consistent", String::new()));
        }

        if opts.wait_index > 0 {
            params.push(("index", opts.wait_index.to_string()));
        }

        if let Some(wait_time) = opts.wait_time {
            params.push(("wait", format!("{}s", wait_time.as_secs())));
        }

        if let Some(ref near) = opts.near {
            params.push(("near", near.clone()));
        }

        for (key, value) in &opts.node_meta {
            params.push(("node-meta", format!("{}:{}", key, value)));
        }

        if let Some(ref filter) = opts.filter {
            params.push(("filter", filter.clone()));
        }

        if opts.use_cache {
            builder = builder.header(header::CACHE_CONTROL, "");
            if let Some(max_age) = opts.max_age {
                builder = builder.header(header::CACHE_CONTROL, format!("max-age={}", max_age.as_secs()));
            }
            if let Some(stale_if_error) = opts.stale_if_error {
                builder = builder.header(
                    header::CACHE_CONTROL,
                    format!("stale-if-error={}", stale_if_error.as_secs()),
                );
            }
        }

        if !params.is_empty() {
            builder = builder.query(&params);
        }

        builder
    }

    /// Apply write options to a request
    pub fn apply_write_options(&self, mut builder: RequestBuilder, opts: &WriteOptions) -> RequestBuilder {
        let mut params: Vec<(&str, String)> = Vec::new();

        if let Some(ref dc) = opts.datacenter {
            params.push(("dc", dc.clone()));
        } else if let Some(ref dc) = self.config.datacenter {
            params.push(("dc", dc.clone()));
        }

        if let Some(ref token) = opts.token {
            builder = builder.header("X-Consul-Token", token);
        }

        if let Some(ref ns) = opts.namespace {
            params.push(("ns", ns.clone()));
        } else if let Some(ref ns) = self.config.namespace {
            params.push(("ns", ns.clone()));
        }

        if let Some(ref partition) = opts.partition {
            params.push(("partition", partition.clone()));
        } else if let Some(ref partition) = self.config.partition {
            params.push(("partition", partition.clone()));
        }

        if !params.is_empty() {
            builder = builder.query(&params);
        }

        builder
    }

    /// Execute a request and return the response
    pub async fn execute(&self, builder: RequestBuilder) -> Result<Response> {
        let response = builder.send().await.map_err(ConsulError::HttpError)?;
        Ok(response)
    }

    /// Execute a request and parse the response as JSON
    pub async fn execute_json<T: serde::de::DeserializeOwned>(
        &self,
        builder: RequestBuilder,
    ) -> Result<T> {
        let response = self.execute(builder).await?;
        self.handle_response_json(response).await
    }

    /// Execute a query request and return response with metadata
    pub async fn query<T: serde::de::DeserializeOwned>(
        &self,
        builder: RequestBuilder,
    ) -> Result<(T, QueryMeta)> {
        let start = Instant::now();
        let response = self.execute(builder).await?;
        let meta = self.parse_query_meta(&response, start.elapsed());
        let data = self.handle_response_json(response).await?;
        Ok((data, meta))
    }

    /// Execute a write request and return response with metadata
    pub async fn write<T: serde::de::DeserializeOwned>(
        &self,
        builder: RequestBuilder,
    ) -> Result<(T, WriteMeta)> {
        let start = Instant::now();
        let response = self.execute(builder).await?;
        let meta = WriteMeta {
            request_time: start.elapsed(),
        };
        let data = self.handle_response_json(response).await?;
        Ok((data, meta))
    }

    /// Execute a write request that returns a boolean
    pub async fn write_bool(&self, builder: RequestBuilder) -> Result<(bool, WriteMeta)> {
        let start = Instant::now();
        let response = self.execute(builder).await?;
        let meta = WriteMeta {
            request_time: start.elapsed(),
        };
        let status = response.status();

        if status.is_success() {
            let text = response.text().await.map_err(ConsulError::HttpError)?;
            let result = text.trim() == "true";
            Ok((result, meta))
        } else {
            let text = response.text().await.unwrap_or_default();
            Err(ConsulError::api_error(status.as_u16(), text))
        }
    }

    /// Execute a write request that returns no content
    pub async fn write_empty(&self, builder: RequestBuilder) -> Result<WriteMeta> {
        let start = Instant::now();
        let response = self.execute(builder).await?;
        let status = response.status();

        if status.is_success() {
            Ok(WriteMeta {
                request_time: start.elapsed(),
            })
        } else {
            let text = response.text().await.unwrap_or_default();
            Err(ConsulError::api_error(status.as_u16(), text))
        }
    }

    /// Handle response and parse JSON
    async fn handle_response_json<T: serde::de::DeserializeOwned>(
        &self,
        response: Response,
    ) -> Result<T> {
        let status = response.status();

        if status.is_success() {
            response.json::<T>().await.map_err(ConsulError::HttpError)
        } else {
            let text = response.text().await.unwrap_or_default();
            error!("API error: {} - {}", status, text);
            Err(ConsulError::api_error(status.as_u16(), text))
        }
    }

    /// Parse query metadata from response headers
    fn parse_query_meta(&self, response: &Response, request_time: Duration) -> QueryMeta {
        let mut meta = QueryMeta {
            request_time,
            ..Default::default()
        };

        if let Some(index) = response.headers().get("X-Consul-Index") {
            if let Ok(s) = index.to_str() {
                meta.last_index = s.parse().unwrap_or(0);
            }
        }

        if let Some(leader) = response.headers().get("X-Consul-KnownLeader") {
            if let Ok(s) = leader.to_str() {
                meta.known_leader = s == "true";
            }
        }

        if let Some(contact) = response.headers().get("X-Consul-LastContact") {
            if let Ok(s) = contact.to_str() {
                if let Ok(ms) = s.parse::<u64>() {
                    meta.last_contact = Duration::from_millis(ms);
                }
            }
        }

        if let Some(translate) = response.headers().get("X-Consul-Translate-Addresses") {
            if let Ok(s) = translate.to_str() {
                meta.address_translation_enabled = s == "true";
            }
        }

        if let Some(hit) = response.headers().get("X-Cache") {
            if let Ok(s) = hit.to_str() {
                meta.cache_hit = s == "HIT";
            }
        }

        if let Some(age) = response.headers().get("Age") {
            if let Ok(s) = age.to_str() {
                if let Ok(secs) = s.parse::<u64>() {
                    meta.cache_age = Some(Duration::from_secs(secs));
                }
            }
        }

        meta
    }

    /// Check if server is reachable
    pub async fn ping(&self) -> Result<()> {
        let response = self.get("/v1/status/leader").send().await.map_err(ConsulError::HttpError)?;

        if response.status().is_success() {
            Ok(())
        } else {
            Err(ConsulError::api_error(
                response.status().as_u16(),
                "failed to connect to Consul",
            ))
        }
    }
}

impl Clone for HttpClient {
    fn clone(&self) -> Self {
        Self {
            config: self.config.clone(),
            client: self.client.clone(),
        }
    }
}