spacegate_config/service/fs/
mod.rs1use std::{
2 collections::HashSet,
3 ffi::{OsStr, OsString},
4 os::unix::ffi::OsStrExt,
5 path::{Path, PathBuf},
6 sync::Arc,
7 time::SystemTime,
8};
9pub mod model;
10use spacegate_model::{BoxError, BoxResult, Config, ConfigItem, PluginInstanceId, SgHttpRoute};
11use tokio::sync::{Mutex, RwLock};
12
13use crate::service::config_format::ConfigFormat;
14
15pub const GATEWAY_DIR: &str = "gateway";
16pub const ROUTE_DIR: &str = "route";
17pub const PLUGIN_DIR: &str = "plugin";
18pub const MODULE_FILE_NAME: &str = "config";
19#[derive(Debug, Clone)]
36pub struct Fs<F> {
37 pub dir: Arc<Path>,
38 pub format: F,
39 pub buffer: Arc<Mutex<Vec<u8>>>,
40 pub prev_retrieve_time: Arc<RwLock<SystemTime>>,
41 pub cached: Arc<RwLock<Config>>,
42 }
45
46impl<F> Fs<F>
47where
48 F: ConfigFormat,
49{
50 pub fn entrance_config_path(&self) -> PathBuf {
51 self.dir.join(MODULE_FILE_NAME).with_extension(self.format.extension())
52 }
53
54 pub async fn collect_config(&self) -> Result<Config, BoxError> {
55 tracing::trace!("retrieve config");
56 let mut config = {
58 let main_config: Config = self.format.de(&tokio::fs::read(self.entrance_config_path()).await?)?;
59 main_config
60 };
61 if let Ok(mut plugin_dir) = tokio::fs::read_dir(self.plugin_dir()).await.inspect_err(|e| tracing::debug!("fail to read plugin dir {e}")) {
63 while let Some(entry) = plugin_dir.next_entry().await? {
64 let Ok(file_name) = entry.file_name().into_string() else {
65 continue;
66 };
67 let plugin_id = PluginInstanceId::from_file_stem(&file_name);
68 let spec: serde_json::Value = self.format.de(&tokio::fs::read(entry.path()).await?)?;
69 config.plugins.insert(plugin_id, spec);
70 }
71 };
72 {
74 let dir_path = self.gateway_dir();
75 let ext = self.format.extension();
76 if let Ok(mut gateway_dir) = tokio::fs::read_dir(&dir_path).await.inspect_err(|e| {
77 tracing::debug!("retrieve gateway dir error: {e}");
78 }) {
79 let mut gateway_names = HashSet::new();
80 while let Some(entry) = gateway_dir.next_entry().await? {
81 let path = entry.path();
82 if (path.is_file() && path.extension() == Some(ext)) || path.is_dir() {
83 if let Some(gateway_name) = path.file_stem().and_then(OsStr::to_str) {
84 tracing::debug!("detected entry {gateway_name}");
85 gateway_names.insert(gateway_name.to_string());
86 }
87 }
88 }
89 for gateway_name in gateway_names {
90 if let Ok(Some(gateway)) = self.collect_gateway_item_config(&gateway_name).await.inspect_err(|e| tracing::debug!("fail to read gateway item: {e}")) {
91 config.gateways.insert(gateway_name, gateway);
92 }
93 }
94 }
95 }
96 tracing::trace!("config: {config:?}");
97 Ok(config)
98 }
99
100 pub async fn save_config(&self, config: Config) -> Result<(), BoxError> {
101 let Config { plugins, gateways, api_port } = config;
103 let main_config_to_save: Config = Config { api_port, ..Default::default() };
104 let b_main_config = self.format.ser(&main_config_to_save)?;
105 tokio::fs::write(self.entrance_config_path(), &b_main_config).await?;
106 if !plugins.is_empty() {
107 tokio::fs::create_dir_all(self.plugin_dir()).await?;
108 for (id, spec) in plugins.into_inner().into_iter() {
109 let path = self.plugin_path(&id);
110 let b_spec = self.format.ser(&spec)?;
111 tokio::fs::write(&path, &b_spec).await?;
112 }
113 }
114 for (gateway_name, item) in gateways.into_iter() {
115 let dir = self.gateway_dir().join(&gateway_name);
116 tokio::fs::create_dir_all(dir).await?;
117 let gateway_path = self.gateway_main_config_path(&gateway_name);
118 let b_gateway = self.format.ser(&ConfigItem {
119 gateway: item.gateway,
120 ..Default::default()
121 })?;
122 tokio::fs::write(&gateway_path, &b_gateway).await?;
123 let route_dir_path = self.routes_dir(&gateway_name);
124 tokio::fs::create_dir_all(&route_dir_path).await?;
125 for (route_name, route) in item.routes.into_iter() {
126 let route_path = self.route_path(&gateway_name, &route_name);
127 let b_route = self.format.ser(&route)?;
128 tokio::fs::write(&route_path, &b_route).await?;
129 }
130 }
131 Ok(())
132 }
133
134 pub async fn collect_gateway_item_config(&self, gateway_name: &str) -> Result<Option<ConfigItem>, BoxError> {
135 let dir_path = self.gateway_dir();
136 let ext = self.format.extension();
137 let mut main_config_path = self.gateway_main_config_path(gateway_name);
139 if !main_config_path.exists() {
140 main_config_path = dir_path.join(gateway_name).with_extension(ext);
142 }
143 if !main_config_path.exists() {
144 return Ok(None);
145 }
146 let mut main_config: ConfigItem = self.format.de(&tokio::fs::read(&main_config_path).await?)?;
147 let route_dir_path = self.routes_dir(gateway_name);
149 if route_dir_path.exists() {
150 let mut route_dir = tokio::fs::read_dir(self.routes_dir(gateway_name)).await?;
151 while let Some(entry) = route_dir.next_entry().await? {
152 let path = entry.path();
153 if path.is_file() && path.extension() == Some(ext) {
154 let Some(route_name) = path.file_stem().and_then(OsStr::to_str) else { continue };
155 if let Ok(route) = self.format.de::<SgHttpRoute>(&tokio::fs::read(&path).await?).inspect_err(|e| tracing::debug!("fail to read route config {path:?}: {e}")) {
156 main_config.routes.insert(route_name.to_string(), route);
157 }
158 }
159 }
160 }
161 Ok(Some(main_config))
162 }
163
164 pub async fn retrieve_cached<M, T>(&self, map: M) -> BoxResult<T>
165 where
166 M: FnOnce(&Config) -> T,
167 {
168 let config = self.collect_config().await?;
169 let result = map(&config);
170 Ok(result)
171 }
172 pub async fn modify_cached<M>(&self, map: M) -> BoxResult<()>
173 where
174 M: FnOnce(&mut Config) -> BoxResult<()>,
175 {
176 let mut config = self.collect_config().await?;
177 let result = map(&mut config);
178 if result.is_ok() {
179 tokio::fs::remove_dir_all(&self.dir).await?;
180 tokio::fs::create_dir_all(&self.dir).await?;
181 self.save_config(config).await?;
182 }
183 Ok(())
184 }
185
186 pub fn new<P: AsRef<Path>>(dir: P, format: F) -> Self {
187 Self {
188 buffer: Default::default(),
189 dir: Arc::from(dir.as_ref().to_owned()),
190 format,
191 prev_retrieve_time: Arc::new(RwLock::new(SystemTime::UNIX_EPOCH)),
192 cached: Default::default(), }
195 }
196 pub async fn clear_cache(&self) {
197 *self.prev_retrieve_time.write().await = SystemTime::UNIX_EPOCH;
198 }
199 pub fn gateway_suffix(&self) -> OsString {
200 let mut ext = OsString::from(GATEWAY_DIR);
201 ext.push(OsStr::from_bytes(b"."));
202 ext.push(self.format.extension());
203 ext
204 }
205
206 pub fn gateway_dir(&self) -> PathBuf {
207 self.dir.join(GATEWAY_DIR)
208 }
209 pub fn gateway_main_config_path(&self, gateway_name: &str) -> PathBuf {
210 self.gateway_dir().join(gateway_name).join(MODULE_FILE_NAME).with_extension(self.format.extension())
211 }
212
213 pub fn routes_dir(&self, gateway_name: &str) -> PathBuf {
214 self.gateway_dir().join(gateway_name).join(ROUTE_DIR)
215 }
216
217 pub fn route_path(&self, gateway_name: &str, route_name: &str) -> PathBuf {
218 self.routes_dir(gateway_name).join(route_name).with_extension(self.format.extension())
219 }
220
221 pub fn plugin_dir(&self) -> PathBuf {
222 self.dir.join(PLUGIN_DIR)
223 }
224
225 pub fn plugin_path(&self, id: &PluginInstanceId) -> PathBuf {
226 let file_stem = id.as_file_stem();
227 self.plugin_dir().join(file_stem).with_extension(self.format.extension())
228 }
229 pub fn extract_gateway_name_from_route_dir(&self, path: &Path) -> Option<String> {
230 if path.extension()? == OsStr::from_bytes(ROUTE_DIR.as_bytes()) {
231 path.file_stem().and_then(OsStr::to_str).map(|f| f.to_string())
232 } else {
233 None
234 }
235 }
236 pub fn extract_route_name(&self, path: &Path) -> Option<(String, String)> {
237 let gateway_name = self.extract_gateway_name_from_route_dir(path.parent()?)?;
238 if path.extension()? == self.format.extension() {
239 let route_name = path.file_stem().and_then(OsStr::to_str).map(|f| f.to_string())?;
240 Some((gateway_name, route_name))
241 } else {
242 None
243 }
244 }
245}
246
247mod create;
248mod delete;
249mod discovery;
250mod listen;
251mod retrieve;
252mod update;