pingap_config/
lib.rs

1// Copyright 2024-2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// External crate imports for async operations, etcd client, and error handling
16use async_trait::async_trait;
17use etcd_client::WatchStream;
18use glob::glob;
19use once_cell::sync::OnceCell;
20use snafu::Snafu;
21use std::time::Duration;
22use tokio::fs;
23use tracing::debug;
24
25mod common;
26mod etcd;
27mod file;
28
29// Error enum for all possible configuration-related errors
30#[derive(Debug, Snafu)]
31pub enum Error {
32    #[snafu(display("Invalid error {message}"))]
33    Invalid { message: String },
34    #[snafu(display("Glob pattern error {source}, {path}"))]
35    Pattern {
36        source: glob::PatternError,
37        path: String,
38    },
39    #[snafu(display("Glob error {source}"))]
40    Glob { source: glob::GlobError },
41    #[snafu(display("Io error {source}, {file}"))]
42    Io {
43        source: std::io::Error,
44        file: String,
45    },
46    #[snafu(display("Toml de error {source}"))]
47    De { source: toml::de::Error },
48    #[snafu(display("Toml ser error {source}"))]
49    Ser { source: toml::ser::Error },
50    #[snafu(display("Url parse error {source}, {url}"))]
51    UrlParse {
52        source: url::ParseError,
53        url: String,
54    },
55    #[snafu(display("Addr parse error {source}, {addr}"))]
56    AddrParse {
57        source: std::net::AddrParseError,
58        addr: String,
59    },
60    #[snafu(display("Base64 decode error {source}"))]
61    Base64Decode { source: base64::DecodeError },
62    #[snafu(display("Regex error {source}"))]
63    Regex { source: regex::Error },
64    #[snafu(display("Etcd error {source}"))]
65    Etcd { source: etcd_client::Error },
66}
67type Result<T, E = Error> = std::result::Result<T, E>;
68
69// Observer struct for watching configuration changes
70pub struct Observer {
71    // Optional watch stream for etcd-based configuration
72    etcd_watch_stream: Option<WatchStream>,
73}
74
75impl Observer {
76    // Watches for configuration changes, returns true if changes detected
77    pub async fn watch(&mut self) -> Result<bool> {
78        let sleep_time = Duration::from_secs(30);
79        // no watch stream, just sleep a moment
80        let Some(stream) = self.etcd_watch_stream.as_mut() else {
81            tokio::time::sleep(sleep_time).await;
82            return Ok(false);
83        };
84        let resp = stream
85            .message()
86            .await
87            .map_err(|e| Error::Etcd { source: e })?;
88
89        Ok(resp.is_some())
90    }
91}
92
93// Options for loading configuration
94#[derive(Debug, Default, Clone)]
95pub struct LoadConfigOptions {
96    pub replace_include: bool, // Whether to replace include directives
97    pub admin: bool,           // Whether this is an admin configuration
98}
99
100// Trait defining storage backend operations for configuration
101#[async_trait]
102pub trait ConfigStorage {
103    // Load configuration with specified options
104    async fn load_config(&self, opts: LoadConfigOptions) -> Result<PingapConf>;
105
106    // Save configuration for a specific category and optional name
107    async fn save_config(
108        &self,
109        conf: &PingapConf,
110        category: &str,
111        name: Option<&str>,
112    ) -> Result<()>;
113
114    // Whether this storage supports change observation
115    fn support_observer(&self) -> bool {
116        false
117    }
118
119    // Create an observer for this storage
120    async fn observe(&self) -> Result<Observer> {
121        Ok(Observer {
122            etcd_watch_stream: None,
123        })
124    }
125
126    // Low-level storage operations
127    async fn save(&self, key: &str, data: &[u8]) -> Result<()>;
128    async fn load(&self, key: &str) -> Result<Vec<u8>>;
129}
130
131// Global static storage for the configuration backend
132static CONFIG_STORAGE: OnceCell<Box<(dyn ConfigStorage + Sync + Send)>> =
133    OnceCell::new();
134
135// Creates a new configuration storage based on the path
136// Supports both etcd:// and file:// protocols
137fn new_config_storage(
138    path: &str,
139) -> Result<Box<(dyn ConfigStorage + Sync + Send)>> {
140    let s: Box<(dyn ConfigStorage + Sync + Send)> =
141        if path.starts_with(ETCD_PROTOCOL) {
142            let storage = EtcdStorage::new(path)?;
143            Box::new(storage)
144        } else {
145            let storage = FileStorage::new(path)?;
146            Box::new(storage)
147        };
148    Ok(s)
149}
150
151/// Initializes the configuration storage system if it hasn't been initialized yet
152///
153/// # Arguments
154/// * `path` - A string path that can be either a file path (file://) or etcd path (etcd://)
155///
156/// # Returns
157/// * `Result<&'static (dyn ConfigStorage + Sync + Send)>` - A reference to the initialized storage
158///   The reference has a static lifetime since it's stored in a global OnceCell
159pub fn try_init_config_storage(
160    path: &str,
161) -> Result<&'static (dyn ConfigStorage + Sync + Send)> {
162    // Attempts to get existing storage or initialize a new one if not present
163    // get_or_try_init ensures this initialization happens only once
164    let conf = CONFIG_STORAGE.get_or_try_init(|| new_config_storage(path))?;
165    // Returns a reference to the storage implementation
166    Ok(conf.as_ref())
167}
168
169pub async fn load_config(opts: LoadConfigOptions) -> Result<PingapConf> {
170    let Some(storage) = CONFIG_STORAGE.get() else {
171        return Err(Error::Invalid {
172            message: "storage is not inited".to_string(),
173        });
174    };
175    storage.load_config(opts).await
176}
177
178pub async fn read_all_toml_files(dir: &str) -> Result<Vec<u8>> {
179    let mut data = vec![];
180    for entry in
181        glob(&format!("{dir}/**/*.toml")).map_err(|e| Error::Pattern {
182            source: e,
183            path: dir.to_string(),
184        })?
185    {
186        let f = entry.map_err(|e| Error::Glob { source: e })?;
187        let mut buf = fs::read(&f).await.map_err(|e| Error::Io {
188            source: e,
189            file: f.to_string_lossy().to_string(),
190        })?;
191        debug!(filename = format!("{f:?}"), "read toml file");
192        // Append file contents and newline
193        data.append(&mut buf);
194        data.push(0x0a);
195    }
196    Ok(data)
197}
198
199pub fn support_observer() -> bool {
200    if let Some(storage) = CONFIG_STORAGE.get() {
201        storage.support_observer()
202    } else {
203        false
204    }
205}
206
207pub fn get_config_storage() -> Option<&'static (dyn ConfigStorage + Sync + Send)>
208{
209    if let Some(storage) = CONFIG_STORAGE.get() {
210        Some(storage.as_ref())
211    } else {
212        None
213    }
214}
215
216pub async fn save_config(
217    conf: &PingapConf,
218    category: &str,
219    name: Option<&str>,
220) -> Result<()> {
221    let Some(storage) = CONFIG_STORAGE.get() else {
222        return Err(Error::Invalid {
223            message: "storage is not inited".to_string(),
224        });
225    };
226    storage.save_config(conf, category, name).await
227}
228pub async fn sync_to_path(path: &str) -> Result<()> {
229    let conf = get_current_config();
230    let storage = new_config_storage(path)?;
231    sync_config(&conf, storage.as_ref()).await
232}
233
234pub async fn sync_config(
235    conf: &PingapConf,
236    storage: &(dyn ConfigStorage + Send + Sync),
237) -> Result<()> {
238    let mut arr = vec![(common::CATEGORY_BASIC, None)];
239    for key in conf.servers.keys() {
240        arr.push((common::CATEGORY_SERVER, Some(key.as_str())));
241    }
242    for key in conf.locations.keys() {
243        arr.push((common::CATEGORY_LOCATION, Some(key.as_str())));
244    }
245    for key in conf.upstreams.keys() {
246        arr.push((common::CATEGORY_UPSTREAM, Some(key.as_str())));
247    }
248    for key in conf.plugins.keys() {
249        arr.push((common::CATEGORY_PLUGIN, Some(key.as_str())));
250    }
251    for key in conf.certificates.keys() {
252        arr.push((common::CATEGORY_CERTIFICATE, Some(key.as_str())));
253    }
254    for key in conf.storages.keys() {
255        arr.push((common::CATEGORY_STORAGE, Some(key.as_str())));
256    }
257    for (category, name) in arr {
258        storage.save_config(conf, category, name).await?;
259    }
260    Ok(())
261}
262
263pub use common::*;
264pub use etcd::{EtcdStorage, ETCD_PROTOCOL};
265pub use file::FileStorage;
266
267#[cfg(test)]
268mod tests {
269    use super::{
270        get_config_storage, load_config, support_observer, sync_to_path,
271        try_init_config_storage, LoadConfigOptions,
272    };
273    use pretty_assertions::assert_eq;
274    use tempfile::TempDir;
275
276    #[tokio::test]
277    async fn test_load_config() {
278        assert_eq!(true, get_config_storage().is_none());
279        try_init_config_storage("./conf").unwrap();
280        let conf = load_config(LoadConfigOptions {
281            replace_include: true,
282            ..Default::default()
283        })
284        .await
285        .unwrap();
286        assert_eq!(8, conf.hash().unwrap().len());
287        assert_eq!(false, support_observer());
288        assert_eq!(true, get_config_storage().is_some());
289        let dir = TempDir::new().unwrap();
290        let file = dir.into_path().join("pingap.toml");
291        tokio::fs::write(&file, b"").await.unwrap();
292        sync_to_path(file.to_string_lossy().as_ref()).await.unwrap();
293    }
294}