batata-consul-client 0.0.2

Rust client for HashiCorp Consul or batata
Documentation
use std::sync::Arc;

use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
use serde::{Deserialize, Serialize};

use crate::client::HttpClient;
use crate::error::{ConsulError, Result};
use crate::types::{QueryMeta, QueryOptions, WriteMeta, WriteOptions};

/// Key-Value pair stored in Consul
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct KVPair {
    /// Key path
    pub key: String,
    /// Create index
    pub create_index: u64,
    /// Modify index
    pub modify_index: u64,
    /// Lock index
    pub lock_index: u64,
    /// Flags (user-defined)
    pub flags: u64,
    /// Base64-encoded value
    #[serde(default)]
    pub value: Option<String>,
    /// Session holding the lock (if any)
    #[serde(default)]
    pub session: Option<String>,
}

impl KVPair {
    /// Create a new KV pair with a key and value
    pub fn new(key: &str, value: &[u8]) -> Self {
        Self {
            key: key.to_string(),
            create_index: 0,
            modify_index: 0,
            lock_index: 0,
            flags: 0,
            value: Some(BASE64.encode(value)),
            session: None,
        }
    }

    /// Create a new KV pair with a string value
    pub fn new_string(key: &str, value: &str) -> Self {
        Self::new(key, value.as_bytes())
    }

    /// Get the decoded value as bytes
    pub fn value_bytes(&self) -> Option<Vec<u8>> {
        self.value.as_ref().and_then(|v| BASE64.decode(v).ok())
    }

    /// Get the decoded value as a UTF-8 string
    pub fn value_string(&self) -> Option<String> {
        self.value_bytes()
            .and_then(|b| String::from_utf8(b).ok())
    }

    /// Set the value from bytes
    pub fn set_value(&mut self, value: &[u8]) {
        self.value = Some(BASE64.encode(value));
    }

    /// Set the value from a string
    pub fn set_value_string(&mut self, value: &str) {
        self.set_value(value.as_bytes());
    }

    /// Set flags
    pub fn with_flags(mut self, flags: u64) -> Self {
        self.flags = flags;
        self
    }

    /// Set session for locking
    pub fn with_session(mut self, session: &str) -> Self {
        self.session = Some(session.to_string());
        self
    }
}

/// KV API client
pub struct KV {
    client: Arc<HttpClient>,
}

impl KV {
    /// Create a new KV client
    pub fn new(client: Arc<HttpClient>) -> Self {
        Self { client }
    }

    /// Get a single key-value pair
    pub async fn get(&self, key: &str, opts: Option<&QueryOptions>) -> Result<(Option<KVPair>, QueryMeta)> {
        let path = format!("/v1/kv/{}", key);
        let mut builder = self.client.get(&path);

        if let Some(opts) = opts {
            builder = self.client.apply_query_options(builder, opts);
        }

        let response = self.client.execute(builder).await?;
        let meta = QueryMeta::default(); // TODO: Parse from headers

        if response.status().as_u16() == 404 {
            return Ok((None, meta));
        }

        let status = response.status();
        if status.is_success() {
            let pairs: Vec<KVPair> = response.json().await.map_err(ConsulError::HttpError)?;
            Ok((pairs.into_iter().next(), meta))
        } else {
            let text = response.text().await.unwrap_or_default();
            Err(ConsulError::api_error(status.as_u16(), text))
        }
    }

    /// Get all keys with a given prefix
    pub async fn list(&self, prefix: &str, opts: Option<&QueryOptions>) -> Result<(Vec<KVPair>, QueryMeta)> {
        let path = format!("/v1/kv/{}?recurse", prefix);
        let mut builder = self.client.get(&path);

        if let Some(opts) = opts {
            builder = self.client.apply_query_options(builder, opts);
        }

        let response = self.client.execute(builder).await?;
        let meta = QueryMeta::default();

        let status = response.status();
        if status.as_u16() == 404 {
            return Ok((Vec::new(), meta));
        }

        if status.is_success() {
            let pairs: Vec<KVPair> = response.json().await.map_err(ConsulError::HttpError)?;
            Ok((pairs, meta))
        } else {
            let text = response.text().await.unwrap_or_default();
            Err(ConsulError::api_error(status.as_u16(), text))
        }
    }

    /// Get only the keys (not values) with a given prefix
    pub async fn keys(&self, prefix: &str, separator: Option<&str>, opts: Option<&QueryOptions>) -> Result<(Vec<String>, QueryMeta)> {
        let mut path = format!("/v1/kv/{}?keys", prefix);
        if let Some(sep) = separator {
            path.push_str(&format!("&separator={}", sep));
        }

        let mut builder = self.client.get(&path);

        if let Some(opts) = opts {
            builder = self.client.apply_query_options(builder, opts);
        }

        let response = self.client.execute(builder).await?;
        let meta = QueryMeta::default();

        let status = response.status();
        if status.as_u16() == 404 {
            return Ok((Vec::new(), meta));
        }

        if status.is_success() {
            let keys: Vec<String> = response.json().await.map_err(ConsulError::HttpError)?;
            Ok((keys, meta))
        } else {
            let text = response.text().await.unwrap_or_default();
            Err(ConsulError::api_error(status.as_u16(), text))
        }
    }

    /// Put a key-value pair
    pub async fn put(&self, pair: &KVPair, opts: Option<&WriteOptions>) -> Result<(bool, WriteMeta)> {
        let mut path = format!("/v1/kv/{}", pair.key);
        let mut params = Vec::new();

        if pair.flags != 0 {
            params.push(format!("flags={}", pair.flags));
        }

        if !params.is_empty() {
            path.push('?');
            path.push_str(&params.join("&"));
        }

        let mut builder = self.client.put(&path);

        if let Some(opts) = opts {
            builder = self.client.apply_write_options(builder, opts);
        }

        // Set body to decoded value
        if let Some(ref encoded) = pair.value {
            if let Ok(decoded) = BASE64.decode(encoded) {
                builder = builder.body(decoded);
            }
        }

        self.client.write_bool(builder).await
    }

    /// Put a key with a string value (convenience method)
    pub async fn put_string(&self, key: &str, value: &str, opts: Option<&WriteOptions>) -> Result<(bool, WriteMeta)> {
        let pair = KVPair::new_string(key, value);
        self.put(&pair, opts).await
    }

    /// Put with Check-And-Set operation (compare modify_index)
    pub async fn cas(&self, pair: &KVPair, opts: Option<&WriteOptions>) -> Result<(bool, WriteMeta)> {
        let mut path = format!("/v1/kv/{}?cas={}", pair.key, pair.modify_index);

        if pair.flags != 0 {
            path.push_str(&format!("&flags={}", pair.flags));
        }

        let mut builder = self.client.put(&path);

        if let Some(opts) = opts {
            builder = self.client.apply_write_options(builder, opts);
        }

        if let Some(ref encoded) = pair.value {
            if let Ok(decoded) = BASE64.decode(encoded) {
                builder = builder.body(decoded);
            }
        }

        self.client.write_bool(builder).await
    }

    /// Delete a key
    pub async fn delete(&self, key: &str, opts: Option<&WriteOptions>) -> Result<(bool, WriteMeta)> {
        let path = format!("/v1/kv/{}", key);
        let mut builder = self.client.delete(&path);

        if let Some(opts) = opts {
            builder = self.client.apply_write_options(builder, opts);
        }

        self.client.write_bool(builder).await
    }

    /// Delete all keys with a given prefix
    pub async fn delete_tree(&self, prefix: &str, opts: Option<&WriteOptions>) -> Result<(bool, WriteMeta)> {
        let path = format!("/v1/kv/{}?recurse", prefix);
        let mut builder = self.client.delete(&path);

        if let Some(opts) = opts {
            builder = self.client.apply_write_options(builder, opts);
        }

        self.client.write_bool(builder).await
    }

    /// Delete with Check-And-Set operation
    pub async fn delete_cas(&self, pair: &KVPair, opts: Option<&WriteOptions>) -> Result<(bool, WriteMeta)> {
        let path = format!("/v1/kv/{}?cas={}", pair.key, pair.modify_index);
        let mut builder = self.client.delete(&path);

        if let Some(opts) = opts {
            builder = self.client.apply_write_options(builder, opts);
        }

        self.client.write_bool(builder).await
    }

    /// Acquire a lock on a key
    pub async fn acquire(&self, pair: &KVPair, opts: Option<&WriteOptions>) -> Result<(bool, WriteMeta)> {
        let session = pair.session.as_ref().ok_or_else(|| {
            ConsulError::InvalidConfig("session is required for lock acquisition".to_string())
        })?;

        let mut path = format!("/v1/kv/{}?acquire={}", pair.key, session);

        if pair.flags != 0 {
            path.push_str(&format!("&flags={}", pair.flags));
        }

        let mut builder = self.client.put(&path);

        if let Some(opts) = opts {
            builder = self.client.apply_write_options(builder, opts);
        }

        if let Some(ref encoded) = pair.value {
            if let Ok(decoded) = BASE64.decode(encoded) {
                builder = builder.body(decoded);
            }
        }

        self.client.write_bool(builder).await
    }

    /// Release a lock on a key
    pub async fn release(&self, pair: &KVPair, opts: Option<&WriteOptions>) -> Result<(bool, WriteMeta)> {
        let session = pair.session.as_ref().ok_or_else(|| {
            ConsulError::InvalidConfig("session is required for lock release".to_string())
        })?;

        let mut path = format!("/v1/kv/{}?release={}", pair.key, session);

        if pair.flags != 0 {
            path.push_str(&format!("&flags={}", pair.flags));
        }

        let mut builder = self.client.put(&path);

        if let Some(opts) = opts {
            builder = self.client.apply_write_options(builder, opts);
        }

        if let Some(ref encoded) = pair.value {
            if let Ok(decoded) = BASE64.decode(encoded) {
                builder = builder.body(decoded);
            }
        }

        self.client.write_bool(builder).await
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_kv_pair_value() {
        let pair = KVPair::new_string("test/key", "hello world");
        assert_eq!(pair.value_string(), Some("hello world".to_string()));
    }

    #[test]
    fn test_kv_pair_bytes() {
        let data = vec![0x00, 0x01, 0x02, 0xff];
        let pair = KVPair::new("test/binary", &data);
        assert_eq!(pair.value_bytes(), Some(data));
    }
}