spacegate_config/service/fs/
mod.rs

1use std::{
2    collections::HashSet,
3    ffi::{OsStr, OsString},
4    os::unix::ffi::OsStrExt,
5    path::{Path, PathBuf},
6    sync::Arc,
7    time::SystemTime,
8};
9pub mod model;
10use spacegate_model::{BoxError, BoxResult, Config, ConfigItem, PluginInstanceId, SgHttpRoute};
11use tokio::sync::{Mutex, RwLock};
12
13use crate::service::config_format::ConfigFormat;
14
15pub const GATEWAY_DIR: &str = "gateway";
16pub const ROUTE_DIR: &str = "route";
17pub const PLUGIN_DIR: &str = "plugin";
18pub const MODULE_FILE_NAME: &str = "config";
19/// # Filesystem Configuration Backend
20///
21/// ## Structure
22/// ``` no_rust
23/// |- config.json
24/// |- plugin/
25/// |  |- plugin_code.json
26/// |- gateway/
27/// |  |- admin/
28/// |  |  |- config.json
29/// |  |  |- route/
30/// |  |  |  |- static.json
31/// |  |  |  |- api.json
32/// |  |- app.json
33///
34/// ```
35#[derive(Debug, Clone)]
36pub struct Fs<F> {
37    pub dir: Arc<Path>,
38    pub format: F,
39    pub buffer: Arc<Mutex<Vec<u8>>>,
40    pub prev_retrieve_time: Arc<RwLock<SystemTime>>,
41    pub cached: Arc<RwLock<Config>>,
42    // /// None for always expire
43    // pub cache_expire: Option<Duration>,
44}
45
46impl<F> Fs<F>
47where
48    F: ConfigFormat,
49{
50    pub fn entrance_config_path(&self) -> PathBuf {
51        self.dir.join(MODULE_FILE_NAME).with_extension(self.format.extension())
52    }
53
54    pub async fn collect_config(&self) -> Result<Config, BoxError> {
55        tracing::trace!("retrieve config");
56        // main config file
57        let mut config = {
58            let main_config: Config = self.format.de(&tokio::fs::read(self.entrance_config_path()).await?)?;
59            main_config
60        };
61        // collect plugin
62        if let Ok(mut plugin_dir) = tokio::fs::read_dir(self.plugin_dir()).await.inspect_err(|e| tracing::debug!("fail to read plugin dir {e}")) {
63            while let Some(entry) = plugin_dir.next_entry().await? {
64                let Ok(file_name) = entry.file_name().into_string() else {
65                    continue;
66                };
67                let plugin_id = PluginInstanceId::from_file_stem(&file_name);
68                let spec: serde_json::Value = self.format.de(&tokio::fs::read(entry.path()).await?)?;
69                config.plugins.insert(plugin_id, spec);
70            }
71        };
72        // collect gateway
73        {
74            let dir_path = self.gateway_dir();
75            let ext = self.format.extension();
76            if let Ok(mut gateway_dir) = tokio::fs::read_dir(&dir_path).await.inspect_err(|e| {
77                tracing::debug!("retrieve gateway dir error: {e}");
78            }) {
79                let mut gateway_names = HashSet::new();
80                while let Some(entry) = gateway_dir.next_entry().await? {
81                    let path = entry.path();
82                    if (path.is_file() && path.extension() == Some(ext)) || path.is_dir() {
83                        if let Some(gateway_name) = path.file_stem().and_then(OsStr::to_str) {
84                            tracing::debug!("detected entry {gateway_name}");
85                            gateway_names.insert(gateway_name.to_string());
86                        }
87                    }
88                }
89                for gateway_name in gateway_names {
90                    if let Ok(Some(gateway)) = self.collect_gateway_item_config(&gateway_name).await.inspect_err(|e| tracing::debug!("fail to read gateway item: {e}")) {
91                        config.gateways.insert(gateway_name, gateway);
92                    }
93                }
94            }
95        }
96        tracing::trace!("config: {config:?}");
97        Ok(config)
98    }
99
100    pub async fn save_config(&self, config: Config) -> Result<(), BoxError> {
101        // save config
102        let Config { plugins, gateways, api_port } = config;
103        let main_config_to_save: Config = Config { api_port, ..Default::default() };
104        let b_main_config = self.format.ser(&main_config_to_save)?;
105        tokio::fs::write(self.entrance_config_path(), &b_main_config).await?;
106        if !plugins.is_empty() {
107            tokio::fs::create_dir_all(self.plugin_dir()).await?;
108            for (id, spec) in plugins.into_inner().into_iter() {
109                let path = self.plugin_path(&id);
110                let b_spec = self.format.ser(&spec)?;
111                tokio::fs::write(&path, &b_spec).await?;
112            }
113        }
114        for (gateway_name, item) in gateways.into_iter() {
115            let dir = self.gateway_dir().join(&gateway_name);
116            tokio::fs::create_dir_all(dir).await?;
117            let gateway_path = self.gateway_main_config_path(&gateway_name);
118            let b_gateway = self.format.ser(&ConfigItem {
119                gateway: item.gateway,
120                ..Default::default()
121            })?;
122            tokio::fs::write(&gateway_path, &b_gateway).await?;
123            let route_dir_path = self.routes_dir(&gateway_name);
124            tokio::fs::create_dir_all(&route_dir_path).await?;
125            for (route_name, route) in item.routes.into_iter() {
126                let route_path = self.route_path(&gateway_name, &route_name);
127                let b_route = self.format.ser(&route)?;
128                tokio::fs::write(&route_path, &b_route).await?;
129            }
130        }
131        Ok(())
132    }
133
134    pub async fn collect_gateway_item_config(&self, gateway_name: &str) -> Result<Option<ConfigItem>, BoxError> {
135        let dir_path = self.gateway_dir();
136        let ext = self.format.extension();
137        // 1. retrieve <gateway_name>.<ext>
138        let mut main_config_path = self.gateway_main_config_path(gateway_name);
139        if !main_config_path.exists() {
140            // 2. module config <gateway_name>/config.<ext>
141            main_config_path = dir_path.join(gateway_name).with_extension(ext);
142        }
143        if !main_config_path.exists() {
144            return Ok(None);
145        }
146        let mut main_config: ConfigItem = self.format.de(&tokio::fs::read(&main_config_path).await?)?;
147        // 3. collect route config
148        let route_dir_path = self.routes_dir(gateway_name);
149        if route_dir_path.exists() {
150            let mut route_dir = tokio::fs::read_dir(self.routes_dir(gateway_name)).await?;
151            while let Some(entry) = route_dir.next_entry().await? {
152                let path = entry.path();
153                if path.is_file() && path.extension() == Some(ext) {
154                    let Some(route_name) = path.file_stem().and_then(OsStr::to_str) else { continue };
155                    if let Ok(route) = self.format.de::<SgHttpRoute>(&tokio::fs::read(&path).await?).inspect_err(|e| tracing::debug!("fail to read route config {path:?}: {e}")) {
156                        main_config.routes.insert(route_name.to_string(), route);
157                    }
158                }
159            }
160        }
161        Ok(Some(main_config))
162    }
163
164    pub async fn retrieve_cached<M, T>(&self, map: M) -> BoxResult<T>
165    where
166        M: FnOnce(&Config) -> T,
167    {
168        let config = self.collect_config().await?;
169        let result = map(&config);
170        Ok(result)
171    }
172    pub async fn modify_cached<M>(&self, map: M) -> BoxResult<()>
173    where
174        M: FnOnce(&mut Config) -> BoxResult<()>,
175    {
176        let mut config = self.collect_config().await?;
177        let result = map(&mut config);
178        if result.is_ok() {
179            tokio::fs::remove_dir_all(&self.dir).await?;
180            tokio::fs::create_dir_all(&self.dir).await?;
181            self.save_config(config).await?;
182        }
183        Ok(())
184    }
185
186    pub fn new<P: AsRef<Path>>(dir: P, format: F) -> Self {
187        Self {
188            buffer: Default::default(),
189            dir: Arc::from(dir.as_ref().to_owned()),
190            format,
191            prev_retrieve_time: Arc::new(RwLock::new(SystemTime::UNIX_EPOCH)),
192            cached: Default::default(), // cache: RwLock::new(None),
193                                        // cache_expire: None,
194        }
195    }
196    pub async fn clear_cache(&self) {
197        *self.prev_retrieve_time.write().await = SystemTime::UNIX_EPOCH;
198    }
199    pub fn gateway_suffix(&self) -> OsString {
200        let mut ext = OsString::from(GATEWAY_DIR);
201        ext.push(OsStr::from_bytes(b"."));
202        ext.push(self.format.extension());
203        ext
204    }
205
206    pub fn gateway_dir(&self) -> PathBuf {
207        self.dir.join(GATEWAY_DIR)
208    }
209    pub fn gateway_main_config_path(&self, gateway_name: &str) -> PathBuf {
210        self.gateway_dir().join(gateway_name).join(MODULE_FILE_NAME).with_extension(self.format.extension())
211    }
212
213    pub fn routes_dir(&self, gateway_name: &str) -> PathBuf {
214        self.gateway_dir().join(gateway_name).join(ROUTE_DIR)
215    }
216
217    pub fn route_path(&self, gateway_name: &str, route_name: &str) -> PathBuf {
218        self.routes_dir(gateway_name).join(route_name).with_extension(self.format.extension())
219    }
220
221    pub fn plugin_dir(&self) -> PathBuf {
222        self.dir.join(PLUGIN_DIR)
223    }
224
225    pub fn plugin_path(&self, id: &PluginInstanceId) -> PathBuf {
226        let file_stem = id.as_file_stem();
227        self.plugin_dir().join(file_stem).with_extension(self.format.extension())
228    }
229    pub fn extract_gateway_name_from_route_dir(&self, path: &Path) -> Option<String> {
230        if path.extension()? == OsStr::from_bytes(ROUTE_DIR.as_bytes()) {
231            path.file_stem().and_then(OsStr::to_str).map(|f| f.to_string())
232        } else {
233            None
234        }
235    }
236    pub fn extract_route_name(&self, path: &Path) -> Option<(String, String)> {
237        let gateway_name = self.extract_gateway_name_from_route_dir(path.parent()?)?;
238        if path.extension()? == self.format.extension() {
239            let route_name = path.file_stem().and_then(OsStr::to_str).map(|f| f.to_string())?;
240            Some((gateway_name, route_name))
241        } else {
242            None
243        }
244    }
245}
246
247mod create;
248mod delete;
249mod discovery;
250mod listen;
251mod retrieve;
252mod update;