pingap_config/
etcd.rs

1// Copyright 2024-2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::{ConfigStorage, Error, LoadConfigOptions, Result};
16use super::{Observer, PingapConf};
17use async_trait::async_trait;
18use etcd_client::{Client, ConnectOptions, GetOptions, WatchOptions};
19use humantime::parse_duration;
20use substring::Substring;
21
22pub struct EtcdStorage {
23    // Base path for all config entries in etcd
24    path: String,
25    // List of etcd server addresses
26    addrs: Vec<String>,
27    // Connection options (timeout, auth, etc)
28    options: ConnectOptions,
29    // Whether to separate config entries by category/name
30    separation: bool,
31}
32
33pub const ETCD_PROTOCOL: &str = "etcd://";
34
35impl EtcdStorage {
36    /// Create a new etcd storage for config.
37    /// Connection url format: etcd://host1:port1,host2:port2/pingap?timeout=10s&connect_timeout=5s&user=**&password=**
38    pub fn new(value: &str) -> Result<Self> {
39        let mut hosts = "".to_string();
40        let mut path = "".to_string();
41        let mut query = "".to_string();
42        if let Some((value1, value2)) = value
43            .substring(ETCD_PROTOCOL.len(), value.len())
44            .split_once('/')
45        {
46            hosts = value1.to_string();
47            let arr: Vec<&str> = value2.split('?').collect();
48            path = format!("/{}", arr[0]);
49            if arr.len() == 2 {
50                query = arr[1].to_string();
51            }
52        }
53
54        let addrs: Vec<String> =
55            hosts.split(',').map(|item| item.to_string()).collect();
56        let mut user = "".to_string();
57        let mut password = "".to_string();
58        let mut options = ConnectOptions::default();
59        let mut separation = false;
60        for (key, value) in pingap_core::convert_query_map(&query) {
61            match key.as_str() {
62                "user" => user = value,
63                "password" => password = value,
64                "timeout" => {
65                    if let Ok(d) = parse_duration(&value) {
66                        options = options.with_timeout(d);
67                    }
68                },
69                "connect_timeout" => {
70                    if let Ok(d) = parse_duration(&value) {
71                        options = options.with_connect_timeout(d);
72                    }
73                },
74                "separation" => {
75                    separation = true;
76                },
77                _ => {},
78            }
79        }
80
81        if !user.is_empty() && !password.is_empty() {
82            options = options.with_user(user, password);
83        };
84        Ok(Self {
85            addrs,
86            options,
87            path,
88            separation,
89        })
90    }
91    /// Connect to etcd server.
92    async fn connect(&self) -> Result<Client> {
93        Client::connect(&self.addrs, Some(self.options.clone()))
94            .await
95            .map_err(|e| Error::Etcd {
96                source: Box::new(e),
97            })
98    }
99}
100
101#[async_trait]
102impl ConfigStorage for EtcdStorage {
103    /// Load config from etcd by fetching all keys under the base path
104    async fn load_config(&self, opts: LoadConfigOptions) -> Result<PingapConf> {
105        let mut c = self.connect().await?;
106        let replace_include = opts.replace_include;
107        let mut opts = GetOptions::new();
108        opts = opts.with_prefix();
109        let arr = c
110            .get(self.path.as_bytes(), Some(opts))
111            .await
112            .map_err(|e| Error::Etcd {
113                source: Box::new(e),
114            })?
115            .take_kvs();
116        let mut buffer = vec![];
117        for item in arr {
118            buffer.extend(item.value());
119            buffer.push(0x0a);
120        }
121        PingapConf::new(buffer.as_slice(), replace_include)
122    }
123    /// Save config to etcd, optionally separating by category/name
124    /// If separation is enabled and name is provided, saves individual sections
125    /// Otherwise saves the entire category as one entry
126    async fn save_config(
127        &self,
128        conf: &PingapConf,
129        category: &str,
130        name: Option<&str>,
131    ) -> Result<()> {
132        conf.validate()?;
133        let (path, toml_value) = if self.separation && name.is_some() {
134            conf.get_toml(category, name)?
135        } else {
136            conf.get_toml(category, None)?
137        };
138        let key = pingap_util::path_join(&self.path, &path);
139        let mut c = self.connect().await?;
140        if toml_value.is_empty() {
141            c.delete(key, None).await.map_err(|e| Error::Etcd {
142                source: Box::new(e),
143            })?;
144        } else {
145            c.put(key, toml_value, None)
146                .await
147                .map_err(|e| Error::Etcd {
148                    source: Box::new(e),
149                })?;
150        }
151        Ok(())
152    }
153    /// Indicates that this storage supports watching for changes
154    fn support_observer(&self) -> bool {
155        true
156    }
157    /// Sets up a watch on the config path to observe changes
158    /// Note: May miss changes if processing takes too long between updates
159    /// Should be used with periodic full fetches to ensure consistency
160    async fn observe(&self) -> Result<Observer> {
161        // 逻辑并不完善,有可能因为变更处理中途又发生其它变更导致缺失
162        // 因此还需配合fetch的形式比对
163        let mut c = self.connect().await?;
164        let (_, stream) = c
165            .watch(
166                self.path.as_bytes(),
167                Some(WatchOptions::default().with_prefix()),
168            )
169            .await
170            .map_err(|e| Error::Etcd {
171                source: Box::new(e),
172            })?;
173        Ok(Observer {
174            etcd_watch_stream: Some(stream),
175        })
176    }
177    /// Save key-value data under the base path
178    async fn save(&self, key: &str, data: &[u8]) -> Result<()> {
179        let key = pingap_util::path_join(&self.path, key);
180        let mut c = self.connect().await?;
181        c.put(key, data, None).await.map_err(|e| Error::Etcd {
182            source: Box::new(e),
183        })?;
184        Ok(())
185    }
186    /// Load key-value data from under the base path
187    async fn load(&self, key: &str) -> Result<Vec<u8>> {
188        let key = pingap_util::path_join(&self.path, key);
189        let mut c = self.connect().await?;
190        let arr = c
191            .get(key, None)
192            .await
193            .map_err(|e| Error::Etcd {
194                source: Box::new(e),
195            })?
196            .take_kvs();
197        let buf = if arr.is_empty() { b"" } else { arr[0].value() };
198        Ok(buf.into())
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::EtcdStorage;
205    use crate::{
206        read_all_toml_files, ConfigStorage, LoadConfigOptions, PingapConf,
207        CATEGORY_BASIC, CATEGORY_LOCATION, CATEGORY_PLUGIN, CATEGORY_SERVER,
208        CATEGORY_STORAGE, CATEGORY_UPSTREAM,
209    };
210    use nanoid::nanoid;
211    use pretty_assertions::assert_eq;
212
213    #[tokio::test]
214    async fn test_etcd_storage() {
215        let url = format!(
216            "etcd://127.0.0.1:2379/{}?timeout=10s&connect_timeout=5s",
217            nanoid!(16)
218        );
219        let storage = EtcdStorage::new(&url).unwrap();
220        let toml_data = read_all_toml_files("../../conf").await.unwrap();
221        let conf =
222            PingapConf::new(toml_data.to_vec().as_slice(), false).unwrap();
223
224        storage
225            .save_config(&conf, CATEGORY_BASIC, None)
226            .await
227            .unwrap();
228        storage
229            .save_config(&conf, CATEGORY_UPSTREAM, None)
230            .await
231            .unwrap();
232        storage
233            .save_config(&conf, CATEGORY_LOCATION, None)
234            .await
235            .unwrap();
236        storage
237            .save_config(&conf, CATEGORY_PLUGIN, None)
238            .await
239            .unwrap();
240        storage
241            .save_config(&conf, CATEGORY_SERVER, None)
242            .await
243            .unwrap();
244        storage
245            .save_config(&conf, CATEGORY_STORAGE, None)
246            .await
247            .unwrap();
248
249        let current_conf = storage
250            .load_config(LoadConfigOptions::default())
251            .await
252            .unwrap();
253        assert_eq!(current_conf.hash().unwrap(), conf.hash().unwrap());
254    }
255}