1use async_trait::async_trait;
17use etcd_client::WatchStream;
18use glob::glob;
19use once_cell::sync::OnceCell;
20use snafu::Snafu;
21use std::time::Duration;
22use tokio::fs;
23use tracing::debug;
24
25mod common;
26mod etcd;
27mod file;
28
29#[derive(Debug, Snafu)]
31pub enum Error {
32 #[snafu(display("Invalid error {message}"))]
33 Invalid { message: String },
34 #[snafu(display("Glob pattern error {source}, {path}"))]
35 Pattern {
36 source: glob::PatternError,
37 path: String,
38 },
39 #[snafu(display("Glob error {source}"))]
40 Glob { source: glob::GlobError },
41 #[snafu(display("Io error {source}, {file}"))]
42 Io {
43 source: std::io::Error,
44 file: String,
45 },
46 #[snafu(display("Toml de error {source}"))]
47 De { source: toml::de::Error },
48 #[snafu(display("Toml ser error {source}"))]
49 Ser { source: toml::ser::Error },
50 #[snafu(display("Url parse error {source}, {url}"))]
51 UrlParse {
52 source: url::ParseError,
53 url: String,
54 },
55 #[snafu(display("Addr parse error {source}, {addr}"))]
56 AddrParse {
57 source: std::net::AddrParseError,
58 addr: String,
59 },
60 #[snafu(display("Base64 decode error {source}"))]
61 Base64Decode { source: base64::DecodeError },
62 #[snafu(display("Regex error {source}"))]
63 Regex { source: regex::Error },
64 #[snafu(display("Etcd error {source}"))]
65 Etcd { source: Box<etcd_client::Error> },
66}
67type Result<T, E = Error> = std::result::Result<T, E>;
68
69pub struct Observer {
71 etcd_watch_stream: Option<WatchStream>,
73}
74
75impl Observer {
76 pub async fn watch(&mut self) -> Result<bool> {
78 let sleep_time = Duration::from_secs(30);
79 let Some(stream) = self.etcd_watch_stream.as_mut() else {
81 tokio::time::sleep(sleep_time).await;
82 return Ok(false);
83 };
84 let resp = stream.message().await.map_err(|e| Error::Etcd {
85 source: Box::new(e),
86 })?;
87
88 Ok(resp.is_some())
89 }
90}
91
92#[derive(Debug, Default, Clone)]
94pub struct LoadConfigOptions {
95 pub replace_include: bool, pub admin: bool, }
98
99#[async_trait]
101pub trait ConfigStorage {
102 async fn load_config(&self, opts: LoadConfigOptions) -> Result<PingapConf>;
104
105 async fn save_config(
107 &self,
108 conf: &PingapConf,
109 category: &str,
110 name: Option<&str>,
111 ) -> Result<()>;
112
113 fn support_observer(&self) -> bool {
115 false
116 }
117
118 async fn observe(&self) -> Result<Observer> {
120 Ok(Observer {
121 etcd_watch_stream: None,
122 })
123 }
124
125 async fn save(&self, key: &str, data: &[u8]) -> Result<()>;
127 async fn load(&self, key: &str) -> Result<Vec<u8>>;
128}
129
130static CONFIG_STORAGE: OnceCell<Box<(dyn ConfigStorage + Sync + Send)>> =
132 OnceCell::new();
133
134fn new_config_storage(
137 path: &str,
138) -> Result<Box<(dyn ConfigStorage + Sync + Send)>> {
139 let s: Box<(dyn ConfigStorage + Sync + Send)> =
140 if path.starts_with(ETCD_PROTOCOL) {
141 let storage = EtcdStorage::new(path)?;
142 Box::new(storage)
143 } else {
144 let storage = FileStorage::new(path)?;
145 Box::new(storage)
146 };
147 Ok(s)
148}
149
150pub fn try_init_config_storage(
159 path: &str,
160) -> Result<&'static (dyn ConfigStorage + Sync + Send)> {
161 let conf = CONFIG_STORAGE.get_or_try_init(|| new_config_storage(path))?;
164 Ok(conf.as_ref())
166}
167
168pub async fn load_config(opts: LoadConfigOptions) -> Result<PingapConf> {
169 let Some(storage) = CONFIG_STORAGE.get() else {
170 return Err(Error::Invalid {
171 message: "storage is not inited".to_string(),
172 });
173 };
174 storage.load_config(opts).await
175}
176
177pub async fn read_all_toml_files(dir: &str) -> Result<Vec<u8>> {
178 let mut data = vec![];
179 for entry in
180 glob(&format!("{dir}/**/*.toml")).map_err(|e| Error::Pattern {
181 source: e,
182 path: dir.to_string(),
183 })?
184 {
185 let f = entry.map_err(|e| Error::Glob { source: e })?;
186 let mut buf = fs::read(&f).await.map_err(|e| Error::Io {
187 source: e,
188 file: f.to_string_lossy().to_string(),
189 })?;
190 debug!(filename = format!("{f:?}"), "read toml file");
191 data.append(&mut buf);
193 data.push(0x0a);
194 }
195 Ok(data)
196}
197
198pub fn support_observer() -> bool {
199 if let Some(storage) = CONFIG_STORAGE.get() {
200 storage.support_observer()
201 } else {
202 false
203 }
204}
205
206pub fn get_config_storage() -> Option<&'static (dyn ConfigStorage + Sync + Send)>
207{
208 if let Some(storage) = CONFIG_STORAGE.get() {
209 Some(storage.as_ref())
210 } else {
211 None
212 }
213}
214
215pub async fn save_config(
216 conf: &PingapConf,
217 category: &str,
218 name: Option<&str>,
219) -> Result<()> {
220 let Some(storage) = CONFIG_STORAGE.get() else {
221 return Err(Error::Invalid {
222 message: "storage is not inited".to_string(),
223 });
224 };
225 storage.save_config(conf, category, name).await
226}
227pub async fn sync_to_path(path: &str) -> Result<()> {
228 let conf = get_current_config();
229 let storage = new_config_storage(path)?;
230 sync_config(&conf, storage.as_ref()).await
231}
232
233pub async fn sync_config(
234 conf: &PingapConf,
235 storage: &(dyn ConfigStorage + Send + Sync),
236) -> Result<()> {
237 let mut arr = vec![(common::CATEGORY_BASIC, None)];
238 for key in conf.servers.keys() {
239 arr.push((common::CATEGORY_SERVER, Some(key.as_str())));
240 }
241 for key in conf.locations.keys() {
242 arr.push((common::CATEGORY_LOCATION, Some(key.as_str())));
243 }
244 for key in conf.upstreams.keys() {
245 arr.push((common::CATEGORY_UPSTREAM, Some(key.as_str())));
246 }
247 for key in conf.plugins.keys() {
248 arr.push((common::CATEGORY_PLUGIN, Some(key.as_str())));
249 }
250 for key in conf.certificates.keys() {
251 arr.push((common::CATEGORY_CERTIFICATE, Some(key.as_str())));
252 }
253 for key in conf.storages.keys() {
254 arr.push((common::CATEGORY_STORAGE, Some(key.as_str())));
255 }
256 for (category, name) in arr {
257 storage.save_config(conf, category, name).await?;
258 }
259 Ok(())
260}
261
262pub use common::*;
263pub use etcd::{EtcdStorage, ETCD_PROTOCOL};
264pub use file::FileStorage;
265
266#[cfg(test)]
267mod tests {
268 use super::{
269 get_config_storage, load_config, support_observer, sync_to_path,
270 try_init_config_storage, LoadConfigOptions,
271 };
272 use pretty_assertions::assert_eq;
273 use tempfile::TempDir;
274
275 #[tokio::test]
276 async fn test_load_config() {
277 assert_eq!(true, get_config_storage().is_none());
278 try_init_config_storage("./conf").unwrap();
279 let conf = load_config(LoadConfigOptions {
280 replace_include: true,
281 ..Default::default()
282 })
283 .await
284 .unwrap();
285 assert_eq!(8, conf.hash().unwrap().len());
286 assert_eq!(false, support_observer());
287 assert_eq!(true, get_config_storage().is_some());
288 let dir = TempDir::new().unwrap();
289 let file = dir.keep().join("pingap.toml");
290 tokio::fs::write(&file, b"").await.unwrap();
291 sync_to_path(file.to_string_lossy().as_ref()).await.unwrap();
292 }
293}