pingap_config/
lib.rs

1// Copyright 2024-2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// External crate imports for async operations, etcd client, and error handling
16use async_trait::async_trait;
17use etcd_client::WatchStream;
18use glob::glob;
19use once_cell::sync::OnceCell;
20use snafu::Snafu;
21use std::time::Duration;
22use tokio::fs;
23use tracing::debug;
24
25mod common;
26mod etcd;
27mod file;
28
29// Error enum for all possible configuration-related errors
30#[derive(Debug, Snafu)]
31pub enum Error {
32    #[snafu(display("Invalid error {message}"))]
33    Invalid { message: String },
34    #[snafu(display("Glob pattern error {source}, {path}"))]
35    Pattern {
36        source: glob::PatternError,
37        path: String,
38    },
39    #[snafu(display("Glob error {source}"))]
40    Glob { source: glob::GlobError },
41    #[snafu(display("Io error {source}, {file}"))]
42    Io {
43        source: std::io::Error,
44        file: String,
45    },
46    #[snafu(display("Toml de error {source}"))]
47    De { source: toml::de::Error },
48    #[snafu(display("Toml ser error {source}"))]
49    Ser { source: toml::ser::Error },
50    #[snafu(display("Url parse error {source}, {url}"))]
51    UrlParse {
52        source: url::ParseError,
53        url: String,
54    },
55    #[snafu(display("Addr parse error {source}, {addr}"))]
56    AddrParse {
57        source: std::net::AddrParseError,
58        addr: String,
59    },
60    #[snafu(display("Base64 decode error {source}"))]
61    Base64Decode { source: base64::DecodeError },
62    #[snafu(display("Regex error {source}"))]
63    Regex { source: regex::Error },
64    #[snafu(display("Etcd error {source}"))]
65    Etcd { source: Box<etcd_client::Error> },
66}
67type Result<T, E = Error> = std::result::Result<T, E>;
68
69// Observer struct for watching configuration changes
70pub struct Observer {
71    // Optional watch stream for etcd-based configuration
72    etcd_watch_stream: Option<WatchStream>,
73}
74
75impl Observer {
76    // Watches for configuration changes, returns true if changes detected
77    pub async fn watch(&mut self) -> Result<bool> {
78        let sleep_time = Duration::from_secs(30);
79        // no watch stream, just sleep a moment
80        let Some(stream) = self.etcd_watch_stream.as_mut() else {
81            tokio::time::sleep(sleep_time).await;
82            return Ok(false);
83        };
84        let resp = stream.message().await.map_err(|e| Error::Etcd {
85            source: Box::new(e),
86        })?;
87
88        Ok(resp.is_some())
89    }
90}
91
92// Options for loading configuration
93#[derive(Debug, Default, Clone)]
94pub struct LoadConfigOptions {
95    pub replace_include: bool, // Whether to replace include directives
96    pub admin: bool,           // Whether this is an admin configuration
97}
98
99// Trait defining storage backend operations for configuration
100#[async_trait]
101pub trait ConfigStorage {
102    // Load configuration with specified options
103    async fn load_config(&self, opts: LoadConfigOptions) -> Result<PingapConf>;
104
105    // Save configuration for a specific category and optional name
106    async fn save_config(
107        &self,
108        conf: &PingapConf,
109        category: &str,
110        name: Option<&str>,
111    ) -> Result<()>;
112
113    // Whether this storage supports change observation
114    fn support_observer(&self) -> bool {
115        false
116    }
117
118    // Create an observer for this storage
119    async fn observe(&self) -> Result<Observer> {
120        Ok(Observer {
121            etcd_watch_stream: None,
122        })
123    }
124
125    // Low-level storage operations
126    async fn save(&self, key: &str, data: &[u8]) -> Result<()>;
127    async fn load(&self, key: &str) -> Result<Vec<u8>>;
128}
129
130// Global static storage for the configuration backend
131static CONFIG_STORAGE: OnceCell<Box<(dyn ConfigStorage + Sync + Send)>> =
132    OnceCell::new();
133
134// Creates a new configuration storage based on the path
135// Supports both etcd:// and file:// protocols
136fn new_config_storage(
137    path: &str,
138) -> Result<Box<(dyn ConfigStorage + Sync + Send)>> {
139    let s: Box<(dyn ConfigStorage + Sync + Send)> =
140        if path.starts_with(ETCD_PROTOCOL) {
141            let storage = EtcdStorage::new(path)?;
142            Box::new(storage)
143        } else {
144            let storage = FileStorage::new(path)?;
145            Box::new(storage)
146        };
147    Ok(s)
148}
149
150/// Initializes the configuration storage system if it hasn't been initialized yet
151///
152/// # Arguments
153/// * `path` - A string path that can be either a file path (file://) or etcd path (etcd://)
154///
155/// # Returns
156/// * `Result<&'static (dyn ConfigStorage + Sync + Send)>` - A reference to the initialized storage
157///   The reference has a static lifetime since it's stored in a global OnceCell
158pub fn try_init_config_storage(
159    path: &str,
160) -> Result<&'static (dyn ConfigStorage + Sync + Send)> {
161    // Attempts to get existing storage or initialize a new one if not present
162    // get_or_try_init ensures this initialization happens only once
163    let conf = CONFIG_STORAGE.get_or_try_init(|| new_config_storage(path))?;
164    // Returns a reference to the storage implementation
165    Ok(conf.as_ref())
166}
167
168pub async fn load_config(opts: LoadConfigOptions) -> Result<PingapConf> {
169    let Some(storage) = CONFIG_STORAGE.get() else {
170        return Err(Error::Invalid {
171            message: "storage is not inited".to_string(),
172        });
173    };
174    storage.load_config(opts).await
175}
176
177pub async fn read_all_toml_files(dir: &str) -> Result<Vec<u8>> {
178    let mut data = vec![];
179    for entry in
180        glob(&format!("{dir}/**/*.toml")).map_err(|e| Error::Pattern {
181            source: e,
182            path: dir.to_string(),
183        })?
184    {
185        let f = entry.map_err(|e| Error::Glob { source: e })?;
186        let mut buf = fs::read(&f).await.map_err(|e| Error::Io {
187            source: e,
188            file: f.to_string_lossy().to_string(),
189        })?;
190        debug!(filename = format!("{f:?}"), "read toml file");
191        // Append file contents and newline
192        data.append(&mut buf);
193        data.push(0x0a);
194    }
195    Ok(data)
196}
197
198pub fn support_observer() -> bool {
199    if let Some(storage) = CONFIG_STORAGE.get() {
200        storage.support_observer()
201    } else {
202        false
203    }
204}
205
206pub fn get_config_storage() -> Option<&'static (dyn ConfigStorage + Sync + Send)>
207{
208    if let Some(storage) = CONFIG_STORAGE.get() {
209        Some(storage.as_ref())
210    } else {
211        None
212    }
213}
214
215pub async fn save_config(
216    conf: &PingapConf,
217    category: &str,
218    name: Option<&str>,
219) -> Result<()> {
220    let Some(storage) = CONFIG_STORAGE.get() else {
221        return Err(Error::Invalid {
222            message: "storage is not inited".to_string(),
223        });
224    };
225    storage.save_config(conf, category, name).await
226}
227pub async fn sync_to_path(path: &str) -> Result<()> {
228    let conf = get_current_config();
229    let storage = new_config_storage(path)?;
230    sync_config(&conf, storage.as_ref()).await
231}
232
233pub async fn sync_config(
234    conf: &PingapConf,
235    storage: &(dyn ConfigStorage + Send + Sync),
236) -> Result<()> {
237    let mut arr = vec![(common::CATEGORY_BASIC, None)];
238    for key in conf.servers.keys() {
239        arr.push((common::CATEGORY_SERVER, Some(key.as_str())));
240    }
241    for key in conf.locations.keys() {
242        arr.push((common::CATEGORY_LOCATION, Some(key.as_str())));
243    }
244    for key in conf.upstreams.keys() {
245        arr.push((common::CATEGORY_UPSTREAM, Some(key.as_str())));
246    }
247    for key in conf.plugins.keys() {
248        arr.push((common::CATEGORY_PLUGIN, Some(key.as_str())));
249    }
250    for key in conf.certificates.keys() {
251        arr.push((common::CATEGORY_CERTIFICATE, Some(key.as_str())));
252    }
253    for key in conf.storages.keys() {
254        arr.push((common::CATEGORY_STORAGE, Some(key.as_str())));
255    }
256    for (category, name) in arr {
257        storage.save_config(conf, category, name).await?;
258    }
259    Ok(())
260}
261
262pub use common::*;
263pub use etcd::{EtcdStorage, ETCD_PROTOCOL};
264pub use file::FileStorage;
265
266#[cfg(test)]
267mod tests {
268    use super::{
269        get_config_storage, load_config, support_observer, sync_to_path,
270        try_init_config_storage, LoadConfigOptions,
271    };
272    use pretty_assertions::assert_eq;
273    use tempfile::TempDir;
274
275    #[tokio::test]
276    async fn test_load_config() {
277        assert_eq!(true, get_config_storage().is_none());
278        try_init_config_storage("./conf").unwrap();
279        let conf = load_config(LoadConfigOptions {
280            replace_include: true,
281            ..Default::default()
282        })
283        .await
284        .unwrap();
285        assert_eq!(8, conf.hash().unwrap().len());
286        assert_eq!(false, support_observer());
287        assert_eq!(true, get_config_storage().is_some());
288        let dir = TempDir::new().unwrap();
289        let file = dir.keep().join("pingap.toml");
290        tokio::fs::write(&file, b"").await.unwrap();
291        sync_to_path(file.to_string_lossy().as_ref()).await.unwrap();
292    }
293}