df_consul/
kv.rs

1use std::collections::HashMap;
2
3use anyhow::{anyhow, Result};
4use bytes::Bytes;
5use log::*;
6use reqwest::StatusCode;
7use serde::{Deserialize, Serialize};
8
9use crate::{Consul, WithIndex};
10
11#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
12#[serde(rename_all = "PascalCase")]
13pub struct KvGetPrefixEntry {
14    pub key: String,
15    pub value: String,
16}
17
18impl Consul {
19    pub async fn kv_get(&self, key: &str) -> Result<Option<Bytes>> {
20        debug!("kv_get {}", key);
21
22        let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key);
23        let http = self.client.get(&url).send().await?;
24        match http.status() {
25            StatusCode::OK => Ok(Some(http.bytes().await?)),
26            StatusCode::NOT_FOUND => Ok(None),
27            _ => Err(anyhow!(
28                "Consul request failed: {:?}",
29                http.error_for_status()
30            )),
31        }
32    }
33
34    pub async fn kv_get_json<T: for<'de> Deserialize<'de>>(&self, key: &str) -> Result<Option<T>> {
35        debug!("kv_get_json {}", key);
36
37        let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key);
38        let http = self.client.get(&url).send().await?;
39        match http.status() {
40            StatusCode::OK => Ok(Some(http.json().await?)),
41            StatusCode::NOT_FOUND => Ok(None),
42            _ => Err(anyhow!(
43                "Consul request failed: {:?}",
44                http.error_for_status()
45            )),
46        }
47    }
48
49    pub async fn kv_get_prefix(
50        &self,
51        key_prefix: &str,
52        last_index: Option<usize>,
53    ) -> Result<WithIndex<HashMap<String, Bytes>>> {
54        debug!("kv_get_prefix {} index={:?}", key_prefix, last_index);
55        let results: WithIndex<Vec<KvGetPrefixEntry>> = self
56            .get_with_index(
57                format!(
58                    "{}/v1/kv/{}{}?recurse",
59                    self.url, self.kv_prefix, key_prefix
60                ),
61                last_index,
62            )
63            .await?;
64
65        let mut res = HashMap::new();
66        for ent in results.value {
67            res.insert(ent.key, Bytes::from(base64::decode(&ent.value)?));
68        }
69
70        Ok(WithIndex {
71            value: res,
72            index: results.index,
73        })
74    }
75
76    pub async fn kv_put(&self, key: &str, bytes: Bytes) -> Result<()> {
77        debug!("kv_put {}", key);
78
79        let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key);
80        let http = self.client.put(&url).body(bytes).send().await?;
81        http.error_for_status()?;
82        Ok(())
83    }
84
85    pub async fn kv_put_json<T: Serialize>(&self, key: &str, value: &T) -> Result<()> {
86        debug!("kv_put_json {}", key);
87
88        let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key);
89        let http = self.client.put(&url).json(value).send().await?;
90        http.error_for_status()?;
91        Ok(())
92    }
93
94    pub async fn kv_delete(&self, key: &str) -> Result<()> {
95        let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key);
96        let http = self.client.delete(&url).send().await?;
97        http.error_for_status()?;
98        Ok(())
99    }
100}