use std::sync::Arc;
use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
use serde::{Deserialize, Serialize};
use crate::client::HttpClient;
use crate::error::{ConsulError, Result};
use crate::types::{QueryMeta, QueryOptions, WriteMeta, WriteOptions};
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct KVPair {
pub key: String,
pub create_index: u64,
pub modify_index: u64,
pub lock_index: u64,
pub flags: u64,
#[serde(default)]
pub value: Option<String>,
#[serde(default)]
pub session: Option<String>,
}
impl KVPair {
pub fn new(key: &str, value: &[u8]) -> Self {
Self {
key: key.to_string(),
create_index: 0,
modify_index: 0,
lock_index: 0,
flags: 0,
value: Some(BASE64.encode(value)),
session: None,
}
}
pub fn new_string(key: &str, value: &str) -> Self {
Self::new(key, value.as_bytes())
}
pub fn value_bytes(&self) -> Option<Vec<u8>> {
self.value.as_ref().and_then(|v| BASE64.decode(v).ok())
}
pub fn value_string(&self) -> Option<String> {
self.value_bytes()
.and_then(|b| String::from_utf8(b).ok())
}
pub fn set_value(&mut self, value: &[u8]) {
self.value = Some(BASE64.encode(value));
}
pub fn set_value_string(&mut self, value: &str) {
self.set_value(value.as_bytes());
}
pub fn with_flags(mut self, flags: u64) -> Self {
self.flags = flags;
self
}
pub fn with_session(mut self, session: &str) -> Self {
self.session = Some(session.to_string());
self
}
}
pub struct KV {
client: Arc<HttpClient>,
}
impl KV {
pub fn new(client: Arc<HttpClient>) -> Self {
Self { client }
}
pub async fn get(&self, key: &str, opts: Option<&QueryOptions>) -> Result<(Option<KVPair>, QueryMeta)> {
let path = format!("/v1/kv/{}", key);
let mut builder = self.client.get(&path);
if let Some(opts) = opts {
builder = self.client.apply_query_options(builder, opts);
}
let response = self.client.execute(builder).await?;
let meta = QueryMeta::default();
if response.status().as_u16() == 404 {
return Ok((None, meta));
}
let status = response.status();
if status.is_success() {
let pairs: Vec<KVPair> = response.json().await.map_err(ConsulError::HttpError)?;
Ok((pairs.into_iter().next(), meta))
} else {
let text = response.text().await.unwrap_or_default();
Err(ConsulError::api_error(status.as_u16(), text))
}
}
pub async fn list(&self, prefix: &str, opts: Option<&QueryOptions>) -> Result<(Vec<KVPair>, QueryMeta)> {
let path = format!("/v1/kv/{}?recurse", prefix);
let mut builder = self.client.get(&path);
if let Some(opts) = opts {
builder = self.client.apply_query_options(builder, opts);
}
let response = self.client.execute(builder).await?;
let meta = QueryMeta::default();
let status = response.status();
if status.as_u16() == 404 {
return Ok((Vec::new(), meta));
}
if status.is_success() {
let pairs: Vec<KVPair> = response.json().await.map_err(ConsulError::HttpError)?;
Ok((pairs, meta))
} else {
let text = response.text().await.unwrap_or_default();
Err(ConsulError::api_error(status.as_u16(), text))
}
}
pub async fn keys(&self, prefix: &str, separator: Option<&str>, opts: Option<&QueryOptions>) -> Result<(Vec<String>, QueryMeta)> {
let mut path = format!("/v1/kv/{}?keys", prefix);
if let Some(sep) = separator {
path.push_str(&format!("&separator={}", sep));
}
let mut builder = self.client.get(&path);
if let Some(opts) = opts {
builder = self.client.apply_query_options(builder, opts);
}
let response = self.client.execute(builder).await?;
let meta = QueryMeta::default();
let status = response.status();
if status.as_u16() == 404 {
return Ok((Vec::new(), meta));
}
if status.is_success() {
let keys: Vec<String> = response.json().await.map_err(ConsulError::HttpError)?;
Ok((keys, meta))
} else {
let text = response.text().await.unwrap_or_default();
Err(ConsulError::api_error(status.as_u16(), text))
}
}
pub async fn put(&self, pair: &KVPair, opts: Option<&WriteOptions>) -> Result<(bool, WriteMeta)> {
let mut path = format!("/v1/kv/{}", pair.key);
let mut params = Vec::new();
if pair.flags != 0 {
params.push(format!("flags={}", pair.flags));
}
if !params.is_empty() {
path.push('?');
path.push_str(¶ms.join("&"));
}
let mut builder = self.client.put(&path);
if let Some(opts) = opts {
builder = self.client.apply_write_options(builder, opts);
}
if let Some(ref encoded) = pair.value {
if let Ok(decoded) = BASE64.decode(encoded) {
builder = builder.body(decoded);
}
}
self.client.write_bool(builder).await
}
pub async fn put_string(&self, key: &str, value: &str, opts: Option<&WriteOptions>) -> Result<(bool, WriteMeta)> {
let pair = KVPair::new_string(key, value);
self.put(&pair, opts).await
}
pub async fn cas(&self, pair: &KVPair, opts: Option<&WriteOptions>) -> Result<(bool, WriteMeta)> {
let mut path = format!("/v1/kv/{}?cas={}", pair.key, pair.modify_index);
if pair.flags != 0 {
path.push_str(&format!("&flags={}", pair.flags));
}
let mut builder = self.client.put(&path);
if let Some(opts) = opts {
builder = self.client.apply_write_options(builder, opts);
}
if let Some(ref encoded) = pair.value {
if let Ok(decoded) = BASE64.decode(encoded) {
builder = builder.body(decoded);
}
}
self.client.write_bool(builder).await
}
pub async fn delete(&self, key: &str, opts: Option<&WriteOptions>) -> Result<(bool, WriteMeta)> {
let path = format!("/v1/kv/{}", key);
let mut builder = self.client.delete(&path);
if let Some(opts) = opts {
builder = self.client.apply_write_options(builder, opts);
}
self.client.write_bool(builder).await
}
pub async fn delete_tree(&self, prefix: &str, opts: Option<&WriteOptions>) -> Result<(bool, WriteMeta)> {
let path = format!("/v1/kv/{}?recurse", prefix);
let mut builder = self.client.delete(&path);
if let Some(opts) = opts {
builder = self.client.apply_write_options(builder, opts);
}
self.client.write_bool(builder).await
}
pub async fn delete_cas(&self, pair: &KVPair, opts: Option<&WriteOptions>) -> Result<(bool, WriteMeta)> {
let path = format!("/v1/kv/{}?cas={}", pair.key, pair.modify_index);
let mut builder = self.client.delete(&path);
if let Some(opts) = opts {
builder = self.client.apply_write_options(builder, opts);
}
self.client.write_bool(builder).await
}
pub async fn acquire(&self, pair: &KVPair, opts: Option<&WriteOptions>) -> Result<(bool, WriteMeta)> {
let session = pair.session.as_ref().ok_or_else(|| {
ConsulError::InvalidConfig("session is required for lock acquisition".to_string())
})?;
let mut path = format!("/v1/kv/{}?acquire={}", pair.key, session);
if pair.flags != 0 {
path.push_str(&format!("&flags={}", pair.flags));
}
let mut builder = self.client.put(&path);
if let Some(opts) = opts {
builder = self.client.apply_write_options(builder, opts);
}
if let Some(ref encoded) = pair.value {
if let Ok(decoded) = BASE64.decode(encoded) {
builder = builder.body(decoded);
}
}
self.client.write_bool(builder).await
}
pub async fn release(&self, pair: &KVPair, opts: Option<&WriteOptions>) -> Result<(bool, WriteMeta)> {
let session = pair.session.as_ref().ok_or_else(|| {
ConsulError::InvalidConfig("session is required for lock release".to_string())
})?;
let mut path = format!("/v1/kv/{}?release={}", pair.key, session);
if pair.flags != 0 {
path.push_str(&format!("&flags={}", pair.flags));
}
let mut builder = self.client.put(&path);
if let Some(opts) = opts {
builder = self.client.apply_write_options(builder, opts);
}
if let Some(ref encoded) = pair.value {
if let Ok(decoded) = BASE64.decode(encoded) {
builder = builder.body(decoded);
}
}
self.client.write_bool(builder).await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_kv_pair_value() {
let pair = KVPair::new_string("test/key", "hello world");
assert_eq!(pair.value_string(), Some("hello world".to_string()));
}
#[test]
fn test_kv_pair_bytes() {
let data = vec![0x00, 0x01, 0x02, 0xff];
let pair = KVPair::new("test/binary", &data);
assert_eq!(pair.value_bytes(), Some(data));
}
}