1use std::collections::HashMap;
2
3use anyhow::{anyhow, Result};
4use bytes::Bytes;
5use log::*;
6use reqwest::StatusCode;
7use serde::{Deserialize, Serialize};
8
9use crate::{Consul, WithIndex};
10
11#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
12#[serde(rename_all = "PascalCase")]
13pub struct KvGetPrefixEntry {
14 pub key: String,
15 pub value: String,
16}
17
18impl Consul {
19 pub async fn kv_get(&self, key: &str) -> Result<Option<Bytes>> {
20 debug!("kv_get {}", key);
21
22 let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key);
23 let http = self.client.get(&url).send().await?;
24 match http.status() {
25 StatusCode::OK => Ok(Some(http.bytes().await?)),
26 StatusCode::NOT_FOUND => Ok(None),
27 _ => Err(anyhow!(
28 "Consul request failed: {:?}",
29 http.error_for_status()
30 )),
31 }
32 }
33
34 pub async fn kv_get_json<T: for<'de> Deserialize<'de>>(&self, key: &str) -> Result<Option<T>> {
35 debug!("kv_get_json {}", key);
36
37 let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key);
38 let http = self.client.get(&url).send().await?;
39 match http.status() {
40 StatusCode::OK => Ok(Some(http.json().await?)),
41 StatusCode::NOT_FOUND => Ok(None),
42 _ => Err(anyhow!(
43 "Consul request failed: {:?}",
44 http.error_for_status()
45 )),
46 }
47 }
48
49 pub async fn kv_get_prefix(
50 &self,
51 key_prefix: &str,
52 last_index: Option<usize>,
53 ) -> Result<WithIndex<HashMap<String, Bytes>>> {
54 debug!("kv_get_prefix {} index={:?}", key_prefix, last_index);
55 let results: WithIndex<Vec<KvGetPrefixEntry>> = self
56 .get_with_index(
57 format!(
58 "{}/v1/kv/{}{}?recurse",
59 self.url, self.kv_prefix, key_prefix
60 ),
61 last_index,
62 )
63 .await?;
64
65 let mut res = HashMap::new();
66 for ent in results.value {
67 res.insert(ent.key, Bytes::from(base64::decode(&ent.value)?));
68 }
69
70 Ok(WithIndex {
71 value: res,
72 index: results.index,
73 })
74 }
75
76 pub async fn kv_put(&self, key: &str, bytes: Bytes) -> Result<()> {
77 debug!("kv_put {}", key);
78
79 let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key);
80 let http = self.client.put(&url).body(bytes).send().await?;
81 http.error_for_status()?;
82 Ok(())
83 }
84
85 pub async fn kv_put_json<T: Serialize>(&self, key: &str, value: &T) -> Result<()> {
86 debug!("kv_put_json {}", key);
87
88 let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key);
89 let http = self.client.put(&url).json(value).send().await?;
90 http.error_for_status()?;
91 Ok(())
92 }
93
94 pub async fn kv_delete(&self, key: &str) -> Result<()> {
95 let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key);
96 let http = self.client.delete(&url).send().await?;
97 http.error_for_status()?;
98 Ok(())
99 }
100}