1use async_trait::async_trait;
17use etcd_client::WatchStream;
18use glob::glob;
19use once_cell::sync::OnceCell;
20use snafu::Snafu;
21use std::time::Duration;
22use tokio::fs;
23use tracing::debug;
24
25mod common;
26mod etcd;
27mod file;
28
29#[derive(Debug, Snafu)]
31pub enum Error {
32 #[snafu(display("Invalid error {message}"))]
33 Invalid { message: String },
34 #[snafu(display("Glob pattern error {source}, {path}"))]
35 Pattern {
36 source: glob::PatternError,
37 path: String,
38 },
39 #[snafu(display("Glob error {source}"))]
40 Glob { source: glob::GlobError },
41 #[snafu(display("Io error {source}, {file}"))]
42 Io {
43 source: std::io::Error,
44 file: String,
45 },
46 #[snafu(display("Toml de error {source}"))]
47 De { source: toml::de::Error },
48 #[snafu(display("Toml ser error {source}"))]
49 Ser { source: toml::ser::Error },
50 #[snafu(display("Url parse error {source}, {url}"))]
51 UrlParse {
52 source: url::ParseError,
53 url: String,
54 },
55 #[snafu(display("Addr parse error {source}, {addr}"))]
56 AddrParse {
57 source: std::net::AddrParseError,
58 addr: String,
59 },
60 #[snafu(display("Base64 decode error {source}"))]
61 Base64Decode { source: base64::DecodeError },
62 #[snafu(display("Regex error {source}"))]
63 Regex { source: regex::Error },
64 #[snafu(display("Etcd error {source}"))]
65 Etcd { source: etcd_client::Error },
66}
67type Result<T, E = Error> = std::result::Result<T, E>;
68
69pub struct Observer {
71 etcd_watch_stream: Option<WatchStream>,
73}
74
75impl Observer {
76 pub async fn watch(&mut self) -> Result<bool> {
78 let sleep_time = Duration::from_secs(30);
79 let Some(stream) = self.etcd_watch_stream.as_mut() else {
81 tokio::time::sleep(sleep_time).await;
82 return Ok(false);
83 };
84 let resp = stream
85 .message()
86 .await
87 .map_err(|e| Error::Etcd { source: e })?;
88
89 Ok(resp.is_some())
90 }
91}
92
93#[derive(Debug, Default, Clone)]
95pub struct LoadConfigOptions {
96 pub replace_include: bool, pub admin: bool, }
99
100#[async_trait]
102pub trait ConfigStorage {
103 async fn load_config(&self, opts: LoadConfigOptions) -> Result<PingapConf>;
105
106 async fn save_config(
108 &self,
109 conf: &PingapConf,
110 category: &str,
111 name: Option<&str>,
112 ) -> Result<()>;
113
114 fn support_observer(&self) -> bool {
116 false
117 }
118
119 async fn observe(&self) -> Result<Observer> {
121 Ok(Observer {
122 etcd_watch_stream: None,
123 })
124 }
125
126 async fn save(&self, key: &str, data: &[u8]) -> Result<()>;
128 async fn load(&self, key: &str) -> Result<Vec<u8>>;
129}
130
131static CONFIG_STORAGE: OnceCell<Box<(dyn ConfigStorage + Sync + Send)>> =
133 OnceCell::new();
134
135fn new_config_storage(
138 path: &str,
139) -> Result<Box<(dyn ConfigStorage + Sync + Send)>> {
140 let s: Box<(dyn ConfigStorage + Sync + Send)> =
141 if path.starts_with(ETCD_PROTOCOL) {
142 let storage = EtcdStorage::new(path)?;
143 Box::new(storage)
144 } else {
145 let storage = FileStorage::new(path)?;
146 Box::new(storage)
147 };
148 Ok(s)
149}
150
151pub fn try_init_config_storage(
160 path: &str,
161) -> Result<&'static (dyn ConfigStorage + Sync + Send)> {
162 let conf = CONFIG_STORAGE.get_or_try_init(|| new_config_storage(path))?;
165 Ok(conf.as_ref())
167}
168
169pub async fn load_config(opts: LoadConfigOptions) -> Result<PingapConf> {
170 let Some(storage) = CONFIG_STORAGE.get() else {
171 return Err(Error::Invalid {
172 message: "storage is not inited".to_string(),
173 });
174 };
175 storage.load_config(opts).await
176}
177
178pub async fn read_all_toml_files(dir: &str) -> Result<Vec<u8>> {
179 let mut data = vec![];
180 for entry in
181 glob(&format!("{dir}/**/*.toml")).map_err(|e| Error::Pattern {
182 source: e,
183 path: dir.to_string(),
184 })?
185 {
186 let f = entry.map_err(|e| Error::Glob { source: e })?;
187 let mut buf = fs::read(&f).await.map_err(|e| Error::Io {
188 source: e,
189 file: f.to_string_lossy().to_string(),
190 })?;
191 debug!(filename = format!("{f:?}"), "read toml file");
192 data.append(&mut buf);
194 data.push(0x0a);
195 }
196 Ok(data)
197}
198
199pub fn support_observer() -> bool {
200 if let Some(storage) = CONFIG_STORAGE.get() {
201 storage.support_observer()
202 } else {
203 false
204 }
205}
206
207pub fn get_config_storage() -> Option<&'static (dyn ConfigStorage + Sync + Send)>
208{
209 if let Some(storage) = CONFIG_STORAGE.get() {
210 Some(storage.as_ref())
211 } else {
212 None
213 }
214}
215
216pub async fn save_config(
217 conf: &PingapConf,
218 category: &str,
219 name: Option<&str>,
220) -> Result<()> {
221 let Some(storage) = CONFIG_STORAGE.get() else {
222 return Err(Error::Invalid {
223 message: "storage is not inited".to_string(),
224 });
225 };
226 storage.save_config(conf, category, name).await
227}
228pub async fn sync_to_path(path: &str) -> Result<()> {
229 let conf = get_current_config();
230 let storage = new_config_storage(path)?;
231 sync_config(&conf, storage.as_ref()).await
232}
233
234pub async fn sync_config(
235 conf: &PingapConf,
236 storage: &(dyn ConfigStorage + Send + Sync),
237) -> Result<()> {
238 let mut arr = vec![(common::CATEGORY_BASIC, None)];
239 for key in conf.servers.keys() {
240 arr.push((common::CATEGORY_SERVER, Some(key.as_str())));
241 }
242 for key in conf.locations.keys() {
243 arr.push((common::CATEGORY_LOCATION, Some(key.as_str())));
244 }
245 for key in conf.upstreams.keys() {
246 arr.push((common::CATEGORY_UPSTREAM, Some(key.as_str())));
247 }
248 for key in conf.plugins.keys() {
249 arr.push((common::CATEGORY_PLUGIN, Some(key.as_str())));
250 }
251 for key in conf.certificates.keys() {
252 arr.push((common::CATEGORY_CERTIFICATE, Some(key.as_str())));
253 }
254 for key in conf.storages.keys() {
255 arr.push((common::CATEGORY_STORAGE, Some(key.as_str())));
256 }
257 for (category, name) in arr {
258 storage.save_config(conf, category, name).await?;
259 }
260 Ok(())
261}
262
263pub use common::*;
264pub use etcd::{EtcdStorage, ETCD_PROTOCOL};
265pub use file::FileStorage;
266
267#[cfg(test)]
268mod tests {
269 use super::{
270 get_config_storage, load_config, support_observer, sync_to_path,
271 try_init_config_storage, LoadConfigOptions,
272 };
273 use pretty_assertions::assert_eq;
274 use tempfile::TempDir;
275
276 #[tokio::test]
277 async fn test_load_config() {
278 assert_eq!(true, get_config_storage().is_none());
279 try_init_config_storage("./conf").unwrap();
280 let conf = load_config(LoadConfigOptions {
281 replace_include: true,
282 ..Default::default()
283 })
284 .await
285 .unwrap();
286 assert_eq!(8, conf.hash().unwrap().len());
287 assert_eq!(false, support_observer());
288 assert_eq!(true, get_config_storage().is_some());
289 let dir = TempDir::new().unwrap();
290 let file = dir.into_path().join("pingap.toml");
291 tokio::fs::write(&file, b"").await.unwrap();
292 sync_to_path(file.to_string_lossy().as_ref()).await.unwrap();
293 }
294}