1use super::{ConfigStorage, Error, LoadConfigOptions, Result};
16use super::{Observer, PingapConf};
17use async_trait::async_trait;
18use etcd_client::{Client, ConnectOptions, GetOptions, WatchOptions};
19use humantime::parse_duration;
20use substring::Substring;
21
22pub struct EtcdStorage {
23 path: String,
25 addrs: Vec<String>,
27 options: ConnectOptions,
29 separation: bool,
31}
32
33pub const ETCD_PROTOCOL: &str = "etcd://";
34
35impl EtcdStorage {
36 pub fn new(value: &str) -> Result<Self> {
39 let mut hosts = "".to_string();
40 let mut path = "".to_string();
41 let mut query = "".to_string();
42 if let Some((value1, value2)) = value
43 .substring(ETCD_PROTOCOL.len(), value.len())
44 .split_once('/')
45 {
46 hosts = value1.to_string();
47 let arr: Vec<&str> = value2.split('?').collect();
48 path = format!("/{}", arr[0]);
49 if arr.len() == 2 {
50 query = arr[1].to_string();
51 }
52 }
53
54 let addrs: Vec<String> =
55 hosts.split(',').map(|item| item.to_string()).collect();
56 let mut user = "".to_string();
57 let mut password = "".to_string();
58 let mut options = ConnectOptions::default();
59 let mut separation = false;
60 for (key, value) in pingap_core::convert_query_map(&query) {
61 match key.as_str() {
62 "user" => user = value,
63 "password" => password = value,
64 "timeout" => {
65 if let Ok(d) = parse_duration(&value) {
66 options = options.with_timeout(d);
67 }
68 },
69 "connect_timeout" => {
70 if let Ok(d) = parse_duration(&value) {
71 options = options.with_connect_timeout(d);
72 }
73 },
74 "separation" => {
75 separation = true;
76 },
77 _ => {},
78 }
79 }
80
81 if !user.is_empty() && !password.is_empty() {
82 options = options.with_user(user, password);
83 };
84 Ok(Self {
85 addrs,
86 options,
87 path,
88 separation,
89 })
90 }
91 async fn connect(&self) -> Result<Client> {
93 Client::connect(&self.addrs, Some(self.options.clone()))
94 .await
95 .map_err(|e| Error::Etcd { source: e })
96 }
97}
98
99#[async_trait]
100impl ConfigStorage for EtcdStorage {
101 async fn load_config(&self, opts: LoadConfigOptions) -> Result<PingapConf> {
103 let mut c = self.connect().await?;
104 let replace_include = opts.replace_include;
105 let mut opts = GetOptions::new();
106 opts = opts.with_prefix();
107 let arr = c
108 .get(self.path.as_bytes(), Some(opts))
109 .await
110 .map_err(|e| Error::Etcd { source: e })?
111 .take_kvs();
112 let mut buffer = vec![];
113 for item in arr {
114 buffer.extend(item.value());
115 buffer.push(0x0a);
116 }
117 PingapConf::new(buffer.as_slice(), replace_include)
118 }
119 async fn save_config(
123 &self,
124 conf: &PingapConf,
125 category: &str,
126 name: Option<&str>,
127 ) -> Result<()> {
128 conf.validate()?;
129 let (path, toml_value) = if self.separation && name.is_some() {
130 conf.get_toml(category, name)?
131 } else {
132 conf.get_toml(category, None)?
133 };
134 let key = pingap_util::path_join(&self.path, &path);
135 let mut c = self.connect().await?;
136 if toml_value.is_empty() {
137 c.delete(key, None)
138 .await
139 .map_err(|e| Error::Etcd { source: e })?;
140 } else {
141 c.put(key, toml_value, None)
142 .await
143 .map_err(|e| Error::Etcd { source: e })?;
144 }
145 Ok(())
146 }
147 fn support_observer(&self) -> bool {
149 true
150 }
151 async fn observe(&self) -> Result<Observer> {
155 let mut c = self.connect().await?;
158 let (_, stream) = c
159 .watch(
160 self.path.as_bytes(),
161 Some(WatchOptions::default().with_prefix()),
162 )
163 .await
164 .map_err(|e| Error::Etcd { source: e })?;
165 Ok(Observer {
166 etcd_watch_stream: Some(stream),
167 })
168 }
169 async fn save(&self, key: &str, data: &[u8]) -> Result<()> {
171 let key = pingap_util::path_join(&self.path, key);
172 let mut c = self.connect().await?;
173 c.put(key, data, None)
174 .await
175 .map_err(|e| Error::Etcd { source: e })?;
176 Ok(())
177 }
178 async fn load(&self, key: &str) -> Result<Vec<u8>> {
180 let key = pingap_util::path_join(&self.path, key);
181 let mut c = self.connect().await?;
182 let arr = c
183 .get(key, None)
184 .await
185 .map_err(|e| Error::Etcd { source: e })?
186 .take_kvs();
187 let buf = if arr.is_empty() { b"" } else { arr[0].value() };
188 Ok(buf.into())
189 }
190}
191
192#[cfg(test)]
193mod tests {
194 use super::EtcdStorage;
195 use crate::{
196 read_all_toml_files, ConfigStorage, LoadConfigOptions, PingapConf,
197 CATEGORY_BASIC, CATEGORY_LOCATION, CATEGORY_PLUGIN, CATEGORY_SERVER,
198 CATEGORY_STORAGE, CATEGORY_UPSTREAM,
199 };
200 use nanoid::nanoid;
201 use pretty_assertions::assert_eq;
202
203 #[tokio::test]
204 async fn test_etcd_storage() {
205 let url = format!(
206 "etcd://127.0.0.1:2379/{}?timeout=10s&connect_timeout=5s",
207 nanoid!(16)
208 );
209 let storage = EtcdStorage::new(&url).unwrap();
210 let toml_data = read_all_toml_files("../../conf").await.unwrap();
211 let conf =
212 PingapConf::new(toml_data.to_vec().as_slice(), false).unwrap();
213
214 storage
215 .save_config(&conf, CATEGORY_BASIC, None)
216 .await
217 .unwrap();
218 storage
219 .save_config(&conf, CATEGORY_UPSTREAM, None)
220 .await
221 .unwrap();
222 storage
223 .save_config(&conf, CATEGORY_LOCATION, None)
224 .await
225 .unwrap();
226 storage
227 .save_config(&conf, CATEGORY_PLUGIN, None)
228 .await
229 .unwrap();
230 storage
231 .save_config(&conf, CATEGORY_SERVER, None)
232 .await
233 .unwrap();
234 storage
235 .save_config(&conf, CATEGORY_STORAGE, None)
236 .await
237 .unwrap();
238
239 let current_conf = storage
240 .load_config(LoadConfigOptions::default())
241 .await
242 .unwrap();
243 assert_eq!(current_conf.hash().unwrap(), conf.hash().unwrap());
244 }
245}