1use etcd_client::WatchStream;
17use glob::glob;
18use snafu::Snafu;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration;
22use tokio::fs;
23use tracing::debug;
24
25mod common;
26mod etcd_storage;
27mod file_storage;
28pub mod hcl;
29mod manager;
30mod storage;
31
32#[derive(Debug, Snafu)]
34pub enum Error {
35 #[snafu(display("Invalid error {message}"))]
36 Invalid { message: String },
37 #[snafu(display("Glob pattern error {source}, {path}"))]
38 Pattern {
39 source: glob::PatternError,
40 path: String,
41 },
42 #[snafu(display("Glob error {source}"))]
43 Glob { source: glob::GlobError },
44 #[snafu(display("Io error {source}, {file}"))]
45 Io {
46 source: std::io::Error,
47 file: String,
48 },
49 #[snafu(display("Toml de error {source}"))]
50 De { source: toml::de::Error },
51 #[snafu(display("Toml ser error {source}"))]
52 Ser { source: toml::ser::Error },
53 #[snafu(display("Url parse error {source}, {url}"))]
54 UrlParse {
55 source: url::ParseError,
56 url: String,
57 },
58 #[snafu(display("Addr parse error {source}, {addr}"))]
59 AddrParse {
60 source: std::net::AddrParseError,
61 addr: String,
62 },
63 #[snafu(display("Base64 decode error {source}"))]
64 Base64Decode { source: base64::DecodeError },
65 #[snafu(display("Regex error {source}"))]
66 Regex { source: regex::Error },
67 #[snafu(display("Etcd error {source}"))]
68 Etcd { source: Box<etcd_client::Error> },
69}
70type Result<T, E = Error> = std::result::Result<T, E>;
71
72pub struct Observer {
74 etcd_watch_stream: Option<WatchStream>,
76}
77
78impl Observer {
79 pub async fn watch(&mut self) -> Result<bool> {
81 let sleep_time = Duration::from_secs(30);
82 let Some(stream) = self.etcd_watch_stream.as_mut() else {
84 tokio::time::sleep(sleep_time).await;
85 return Ok(false);
86 };
87 let resp = stream.message().await.map_err(|e| Error::Etcd {
88 source: Box::new(e),
89 })?;
90
91 Ok(resp.is_some())
92 }
93}
94
95#[derive(PartialEq, Clone, Debug)]
96pub enum Category {
97 Basic,
98 Server,
99 Location,
100 Upstream,
101 Plugin,
102 Certificate,
103 Storage,
104}
105
106impl std::fmt::Display for Category {
107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108 match self {
111 Category::Basic => write!(f, "basic"),
112 Category::Server => write!(f, "server"),
113 Category::Location => write!(f, "location"),
114 Category::Upstream => write!(f, "upstream"),
115 Category::Plugin => write!(f, "plugin"),
116 Category::Certificate => write!(f, "certificate"),
117 Category::Storage => write!(f, "storage"),
118 }
119 }
120}
121impl FromStr for Category {
122 type Err = Error;
123
124 fn from_str(s: &str) -> Result<Self, Self::Err> {
125 match s.to_lowercase().as_str() {
126 "basic" => Ok(Category::Basic),
127 "server" => Ok(Category::Server),
128 "location" => Ok(Category::Location),
129 "upstream" => Ok(Category::Upstream),
130 "plugin" => Ok(Category::Plugin),
131 "certificate" => Ok(Category::Certificate),
132 "storage" => Ok(Category::Storage),
133 _ => Err(Error::Invalid {
134 message: format!("invalid category: {s}"),
135 }),
136 }
137 }
138}
139
140pub fn new_config_manager(value: &str) -> Result<ConfigManager> {
141 if value.starts_with(etcd_storage::ETCD_PROTOCOL) {
142 new_etcd_config_manager(value)
143 } else {
144 new_file_config_manager(value)
145 }
146}
147
148fn permission_error_message(
152 path: &std::path::Path,
153 source: std::io::Error,
154) -> Error {
155 #[cfg(unix)]
156 {
157 use std::os::unix::fs::MetadataExt;
158 if source.kind() == std::io::ErrorKind::PermissionDenied
159 && let Ok(meta) = std::fs::metadata(path)
160 {
161 let mode = meta.mode() & 0o7777;
162 let uid = meta.uid();
163 let gid = meta.gid();
164 return Error::Invalid {
165 message: format!(
166 "Config file '{}' is not readable: permission denied \
167 (owner uid:{} gid:{}, mode:{:04o}). \
168 Please ensure the pingap process user can read this file, \
169 e.g.: chown <pingap-user>:<pingap-group> '{}' or chmod o+r '{}'",
170 path.display(),
171 uid,
172 gid,
173 mode,
174 path.display(),
175 path.display(),
176 ),
177 };
178 }
179 }
180 Error::Io {
181 source,
182 file: path.to_string_lossy().to_string(),
183 }
184}
185
186pub async fn read_all_config_files(dir: &str) -> Result<Vec<u8>> {
187 let mut data = vec![];
188 let toml_files: std::result::Result<Vec<_>, _> =
190 glob(&format!("{dir}/**/*.toml"))
191 .map_err(|e| Error::Pattern {
192 source: e,
193 path: dir.to_string(),
194 })?
195 .collect();
196 let toml_files = toml_files.map_err(|e| Error::Glob { source: e })?;
197
198 if !toml_files.is_empty() {
199 for f in toml_files {
201 let mut buf = fs::read(&f)
202 .await
203 .map_err(|e| permission_error_message(&f, e))?;
204 debug!(filename = format!("{f:?}"), "read toml file");
205 data.append(&mut buf);
206 data.push(0x0a);
207 }
208 } else {
209 for entry in
211 glob(&format!("{dir}/**/*.hcl")).map_err(|e| Error::Pattern {
212 source: e,
213 path: dir.to_string(),
214 })?
215 {
216 let f = entry.map_err(|e| Error::Glob { source: e })?;
217 let buf = fs::read(&f)
218 .await
219 .map_err(|e| permission_error_message(&f, e))?;
220 debug!(filename = format!("{f:?}"), "read hcl file");
221 let hcl_str = String::from_utf8_lossy(&buf);
222 let toml_str = hcl::convert_hcl_to_toml(&hcl_str)?;
223 data.extend_from_slice(toml_str.as_bytes());
224 data.push(0x0a);
225 }
226 }
227 Ok(data)
228}
229
230pub async fn sync_to_path(
231 config_manager: Arc<ConfigManager>,
232 path: &str,
233) -> Result<()> {
234 let config = config_manager.get_current_config();
235 let config = toml::to_string_pretty(&config.as_ref().clone())
236 .map_err(|e| Error::Ser { source: e })?;
237 let config = toml::from_str::<PingapTomlConfig>(&config)
238 .map_err(|e| Error::De { source: e })?;
239 let new_config_manager = new_config_manager(path)?;
240 new_config_manager.save_all(&config).await?;
241 Ok(())
242}
243
244pub use common::*;
245pub use etcd_storage::ETCD_PROTOCOL;
246pub use manager::*;
247pub use storage::*;