1use super::{ConfigStorage, Error, LoadConfigOptions, Result};
16use super::{Observer, PingapConf};
17use async_trait::async_trait;
18use etcd_client::{Client, ConnectOptions, GetOptions, WatchOptions};
19use humantime::parse_duration;
20use substring::Substring;
21
22pub struct EtcdStorage {
23 path: String,
25 addrs: Vec<String>,
27 options: ConnectOptions,
29 separation: bool,
31}
32
33pub const ETCD_PROTOCOL: &str = "etcd://";
34
35impl EtcdStorage {
36 pub fn new(value: &str) -> Result<Self> {
39 let mut hosts = "".to_string();
40 let mut path = "".to_string();
41 let mut query = "".to_string();
42 if let Some((value1, value2)) = value
43 .substring(ETCD_PROTOCOL.len(), value.len())
44 .split_once('/')
45 {
46 hosts = value1.to_string();
47 let arr: Vec<&str> = value2.split('?').collect();
48 path = format!("/{}", arr[0]);
49 if arr.len() == 2 {
50 query = arr[1].to_string();
51 }
52 }
53
54 let addrs: Vec<String> =
55 hosts.split(',').map(|item| item.to_string()).collect();
56 let mut user = "".to_string();
57 let mut password = "".to_string();
58 let mut options = ConnectOptions::default();
59 let mut separation = false;
60 for (key, value) in pingap_core::convert_query_map(&query) {
61 match key.as_str() {
62 "user" => user = value,
63 "password" => password = value,
64 "timeout" => {
65 if let Ok(d) = parse_duration(&value) {
66 options = options.with_timeout(d);
67 }
68 },
69 "connect_timeout" => {
70 if let Ok(d) = parse_duration(&value) {
71 options = options.with_connect_timeout(d);
72 }
73 },
74 "separation" => {
75 separation = true;
76 },
77 _ => {},
78 }
79 }
80
81 if !user.is_empty() && !password.is_empty() {
82 options = options.with_user(user, password);
83 };
84 Ok(Self {
85 addrs,
86 options,
87 path,
88 separation,
89 })
90 }
91 async fn connect(&self) -> Result<Client> {
93 Client::connect(&self.addrs, Some(self.options.clone()))
94 .await
95 .map_err(|e| Error::Etcd {
96 source: Box::new(e),
97 })
98 }
99}
100
101#[async_trait]
102impl ConfigStorage for EtcdStorage {
103 async fn load_config(&self, opts: LoadConfigOptions) -> Result<PingapConf> {
105 let mut c = self.connect().await?;
106 let replace_include = opts.replace_include;
107 let mut opts = GetOptions::new();
108 opts = opts.with_prefix();
109 let arr = c
110 .get(self.path.as_bytes(), Some(opts))
111 .await
112 .map_err(|e| Error::Etcd {
113 source: Box::new(e),
114 })?
115 .take_kvs();
116 let mut buffer = vec![];
117 for item in arr {
118 buffer.extend(item.value());
119 buffer.push(0x0a);
120 }
121 PingapConf::new(buffer.as_slice(), replace_include)
122 }
123 async fn save_config(
127 &self,
128 conf: &PingapConf,
129 category: &str,
130 name: Option<&str>,
131 ) -> Result<()> {
132 conf.validate()?;
133 let (path, toml_value) = if self.separation && name.is_some() {
134 conf.get_toml(category, name)?
135 } else {
136 conf.get_toml(category, None)?
137 };
138 let key = pingap_util::path_join(&self.path, &path);
139 let mut c = self.connect().await?;
140 if toml_value.is_empty() {
141 c.delete(key, None).await.map_err(|e| Error::Etcd {
142 source: Box::new(e),
143 })?;
144 } else {
145 c.put(key, toml_value, None)
146 .await
147 .map_err(|e| Error::Etcd {
148 source: Box::new(e),
149 })?;
150 }
151 Ok(())
152 }
153 fn support_observer(&self) -> bool {
155 true
156 }
157 async fn observe(&self) -> Result<Observer> {
161 let mut c = self.connect().await?;
164 let (_, stream) = c
165 .watch(
166 self.path.as_bytes(),
167 Some(WatchOptions::default().with_prefix()),
168 )
169 .await
170 .map_err(|e| Error::Etcd {
171 source: Box::new(e),
172 })?;
173 Ok(Observer {
174 etcd_watch_stream: Some(stream),
175 })
176 }
177 async fn save(&self, key: &str, data: &[u8]) -> Result<()> {
179 let key = pingap_util::path_join(&self.path, key);
180 let mut c = self.connect().await?;
181 c.put(key, data, None).await.map_err(|e| Error::Etcd {
182 source: Box::new(e),
183 })?;
184 Ok(())
185 }
186 async fn load(&self, key: &str) -> Result<Vec<u8>> {
188 let key = pingap_util::path_join(&self.path, key);
189 let mut c = self.connect().await?;
190 let arr = c
191 .get(key, None)
192 .await
193 .map_err(|e| Error::Etcd {
194 source: Box::new(e),
195 })?
196 .take_kvs();
197 let buf = if arr.is_empty() { b"" } else { arr[0].value() };
198 Ok(buf.into())
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::EtcdStorage;
205 use crate::{
206 read_all_toml_files, ConfigStorage, LoadConfigOptions, PingapConf,
207 CATEGORY_BASIC, CATEGORY_LOCATION, CATEGORY_PLUGIN, CATEGORY_SERVER,
208 CATEGORY_STORAGE, CATEGORY_UPSTREAM,
209 };
210 use nanoid::nanoid;
211 use pretty_assertions::assert_eq;
212
213 #[tokio::test]
214 async fn test_etcd_storage() {
215 let url = format!(
216 "etcd://127.0.0.1:2379/{}?timeout=10s&connect_timeout=5s",
217 nanoid!(16)
218 );
219 let storage = EtcdStorage::new(&url).unwrap();
220 let toml_data = read_all_toml_files("../../conf").await.unwrap();
221 let conf =
222 PingapConf::new(toml_data.to_vec().as_slice(), false).unwrap();
223
224 storage
225 .save_config(&conf, CATEGORY_BASIC, None)
226 .await
227 .unwrap();
228 storage
229 .save_config(&conf, CATEGORY_UPSTREAM, None)
230 .await
231 .unwrap();
232 storage
233 .save_config(&conf, CATEGORY_LOCATION, None)
234 .await
235 .unwrap();
236 storage
237 .save_config(&conf, CATEGORY_PLUGIN, None)
238 .await
239 .unwrap();
240 storage
241 .save_config(&conf, CATEGORY_SERVER, None)
242 .await
243 .unwrap();
244 storage
245 .save_config(&conf, CATEGORY_STORAGE, None)
246 .await
247 .unwrap();
248
249 let current_conf = storage
250 .load_config(LoadConfigOptions::default())
251 .await
252 .unwrap();
253 assert_eq!(current_conf.hash().unwrap(), conf.hash().unwrap());
254 }
255}