pingap_config/
etcd.rs

1// Copyright 2024-2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::{ConfigStorage, Error, LoadConfigOptions, Result};
16use super::{Observer, PingapConf};
17use async_trait::async_trait;
18use etcd_client::{Client, ConnectOptions, GetOptions, WatchOptions};
19use humantime::parse_duration;
20use substring::Substring;
21
22pub struct EtcdStorage {
23    // Base path for all config entries in etcd
24    path: String,
25    // List of etcd server addresses
26    addrs: Vec<String>,
27    // Connection options (timeout, auth, etc)
28    options: ConnectOptions,
29    // Whether to separate config entries by category/name
30    separation: bool,
31}
32
33pub const ETCD_PROTOCOL: &str = "etcd://";
34
35impl EtcdStorage {
36    /// Create a new etcd storage for config.
37    /// Connection url format: etcd://host1:port1,host2:port2/pingap?timeout=10s&connect_timeout=5s&user=**&password=**
38    pub fn new(value: &str) -> Result<Self> {
39        let mut hosts = "".to_string();
40        let mut path = "".to_string();
41        let mut query = "".to_string();
42        if let Some((value1, value2)) = value
43            .substring(ETCD_PROTOCOL.len(), value.len())
44            .split_once('/')
45        {
46            hosts = value1.to_string();
47            let arr: Vec<&str> = value2.split('?').collect();
48            path = format!("/{}", arr[0]);
49            if arr.len() == 2 {
50                query = arr[1].to_string();
51            }
52        }
53
54        let addrs: Vec<String> =
55            hosts.split(',').map(|item| item.to_string()).collect();
56        let mut user = "".to_string();
57        let mut password = "".to_string();
58        let mut options = ConnectOptions::default();
59        let mut separation = false;
60        for (key, value) in pingap_core::convert_query_map(&query) {
61            match key.as_str() {
62                "user" => user = value,
63                "password" => password = value,
64                "timeout" => {
65                    if let Ok(d) = parse_duration(&value) {
66                        options = options.with_timeout(d);
67                    }
68                },
69                "connect_timeout" => {
70                    if let Ok(d) = parse_duration(&value) {
71                        options = options.with_connect_timeout(d);
72                    }
73                },
74                "separation" => {
75                    separation = true;
76                },
77                _ => {},
78            }
79        }
80
81        if !user.is_empty() && !password.is_empty() {
82            options = options.with_user(user, password);
83        };
84        Ok(Self {
85            addrs,
86            options,
87            path,
88            separation,
89        })
90    }
91    /// Connect to etcd server.
92    async fn connect(&self) -> Result<Client> {
93        Client::connect(&self.addrs, Some(self.options.clone()))
94            .await
95            .map_err(|e| Error::Etcd { source: e })
96    }
97}
98
99#[async_trait]
100impl ConfigStorage for EtcdStorage {
101    /// Load config from etcd by fetching all keys under the base path
102    async fn load_config(&self, opts: LoadConfigOptions) -> Result<PingapConf> {
103        let mut c = self.connect().await?;
104        let replace_include = opts.replace_include;
105        let mut opts = GetOptions::new();
106        opts = opts.with_prefix();
107        let arr = c
108            .get(self.path.as_bytes(), Some(opts))
109            .await
110            .map_err(|e| Error::Etcd { source: e })?
111            .take_kvs();
112        let mut buffer = vec![];
113        for item in arr {
114            buffer.extend(item.value());
115            buffer.push(0x0a);
116        }
117        PingapConf::new(buffer.as_slice(), replace_include)
118    }
119    /// Save config to etcd, optionally separating by category/name
120    /// If separation is enabled and name is provided, saves individual sections
121    /// Otherwise saves the entire category as one entry
122    async fn save_config(
123        &self,
124        conf: &PingapConf,
125        category: &str,
126        name: Option<&str>,
127    ) -> Result<()> {
128        conf.validate()?;
129        let (path, toml_value) = if self.separation && name.is_some() {
130            conf.get_toml(category, name)?
131        } else {
132            conf.get_toml(category, None)?
133        };
134        let key = pingap_util::path_join(&self.path, &path);
135        let mut c = self.connect().await?;
136        if toml_value.is_empty() {
137            c.delete(key, None)
138                .await
139                .map_err(|e| Error::Etcd { source: e })?;
140        } else {
141            c.put(key, toml_value, None)
142                .await
143                .map_err(|e| Error::Etcd { source: e })?;
144        }
145        Ok(())
146    }
147    /// Indicates that this storage supports watching for changes
148    fn support_observer(&self) -> bool {
149        true
150    }
151    /// Sets up a watch on the config path to observe changes
152    /// Note: May miss changes if processing takes too long between updates
153    /// Should be used with periodic full fetches to ensure consistency
154    async fn observe(&self) -> Result<Observer> {
155        // 逻辑并不完善,有可能因为变更处理中途又发生其它变更导致缺失
156        // 因此还需配合fetch的形式比对
157        let mut c = self.connect().await?;
158        let (_, stream) = c
159            .watch(
160                self.path.as_bytes(),
161                Some(WatchOptions::default().with_prefix()),
162            )
163            .await
164            .map_err(|e| Error::Etcd { source: e })?;
165        Ok(Observer {
166            etcd_watch_stream: Some(stream),
167        })
168    }
169    /// Save key-value data under the base path
170    async fn save(&self, key: &str, data: &[u8]) -> Result<()> {
171        let key = pingap_util::path_join(&self.path, key);
172        let mut c = self.connect().await?;
173        c.put(key, data, None)
174            .await
175            .map_err(|e| Error::Etcd { source: e })?;
176        Ok(())
177    }
178    /// Load key-value data from under the base path
179    async fn load(&self, key: &str) -> Result<Vec<u8>> {
180        let key = pingap_util::path_join(&self.path, key);
181        let mut c = self.connect().await?;
182        let arr = c
183            .get(key, None)
184            .await
185            .map_err(|e| Error::Etcd { source: e })?
186            .take_kvs();
187        let buf = if arr.is_empty() { b"" } else { arr[0].value() };
188        Ok(buf.into())
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use super::EtcdStorage;
195    use crate::{
196        read_all_toml_files, ConfigStorage, LoadConfigOptions, PingapConf,
197        CATEGORY_BASIC, CATEGORY_LOCATION, CATEGORY_PLUGIN, CATEGORY_SERVER,
198        CATEGORY_STORAGE, CATEGORY_UPSTREAM,
199    };
200    use nanoid::nanoid;
201    use pretty_assertions::assert_eq;
202
203    #[tokio::test]
204    async fn test_etcd_storage() {
205        let url = format!(
206            "etcd://127.0.0.1:2379/{}?timeout=10s&connect_timeout=5s",
207            nanoid!(16)
208        );
209        let storage = EtcdStorage::new(&url).unwrap();
210        let toml_data = read_all_toml_files("../../conf").await.unwrap();
211        let conf =
212            PingapConf::new(toml_data.to_vec().as_slice(), false).unwrap();
213
214        storage
215            .save_config(&conf, CATEGORY_BASIC, None)
216            .await
217            .unwrap();
218        storage
219            .save_config(&conf, CATEGORY_UPSTREAM, None)
220            .await
221            .unwrap();
222        storage
223            .save_config(&conf, CATEGORY_LOCATION, None)
224            .await
225            .unwrap();
226        storage
227            .save_config(&conf, CATEGORY_PLUGIN, None)
228            .await
229            .unwrap();
230        storage
231            .save_config(&conf, CATEGORY_SERVER, None)
232            .await
233            .unwrap();
234        storage
235            .save_config(&conf, CATEGORY_STORAGE, None)
236            .await
237            .unwrap();
238
239        let current_conf = storage
240            .load_config(LoadConfigOptions::default())
241            .await
242            .unwrap();
243        assert_eq!(current_conf.hash().unwrap(), conf.hash().unwrap());
244    }
245}