datafusion_server/
settings.rs

1// settings.rs: Application wide system settings by configuration file
2// Sasaki, Naoki <nsasaki@sal.co.jp> December 31, 2022
3//
4
5use std::path::Path;
6
7use config::{
8    builder::DefaultState,
9    {Config, ConfigBuilder, ConfigError, File},
10};
11use log::Level;
12use once_cell::sync::OnceCell;
13use serde::Deserialize;
14
15#[cfg(any(feature = "postgres", feature = "mysql"))]
16use crate::data_source::database::database_manager;
17use crate::data_source::object_store::credential_manager;
18
19#[derive(Debug, Deserialize, Clone)]
20pub struct Server {
21    pub address: String,
22    pub port: u16,
23    pub flight_address: String,
24    pub flight_grpc_port: u16,
25    pub metrics_address: String,
26    pub metrics_port: u16,
27    pub base_url: String,
28    pub data_dir: String,
29    pub plugin_dir: String,
30    pub disable_stateful_features: bool,
31}
32
33#[derive(Debug, Deserialize, Clone)]
34pub struct Session {
35    pub default_keep_alive: i64,  // in seconds
36    pub upload_limit_size: usize, // in MB
37}
38
39#[derive(Debug, Deserialize, Clone)]
40pub struct Log {
41    pub level: String,
42}
43
44impl Log {
45    #[must_use]
46    pub fn level(&self) -> Option<Level> {
47        match &*self.level.to_lowercase() {
48            "trace" => Some(Level::Trace),
49            "debug" => Some(Level::Debug),
50            "info" => Some(Level::Info),
51            "warn" => Some(Level::Warn),
52            "error" => Some(Level::Error),
53            _ => None,
54        }
55    }
56}
57
58#[cfg(feature = "postgres")]
59#[derive(Debug, Deserialize, Clone)]
60pub struct DatabaseConfigPostgres {
61    pub namespace: Option<String>,
62    pub user: String,
63    pub password: String,
64    pub host: String,
65    pub port: Option<u16>,
66    pub database: String,
67    pub ssl_mode: Option<String>,
68    pub max_connections: Option<u32>,
69    pub enable_schema_cache: Option<bool>,
70    pub description: Option<String>,
71}
72
73#[cfg(feature = "mysql")]
74#[derive(Debug, Deserialize, Clone)]
75pub struct DatabaseConfigMySQL {
76    pub namespace: Option<String>,
77    pub user: String,
78    pub password: String,
79    pub host: String,
80    pub port: Option<u16>,
81    pub database: String,
82    pub ssl_mode: Option<String>,
83    pub max_connections: Option<u32>,
84    pub enable_schema_cache: Option<bool>,
85    pub description: Option<String>,
86}
87
88#[cfg(any(feature = "postgres", feature = "mysql"))]
89#[derive(Debug, Deserialize, Clone)]
90#[serde(tag = "type", rename_all = "lowercase")]
91pub enum Database {
92    #[cfg(feature = "postgres")]
93    Postgres(DatabaseConfigPostgres),
94    #[cfg(feature = "mysql")]
95    MySQL(DatabaseConfigMySQL),
96}
97
98#[cfg(any(feature = "postgres", feature = "mysql"))]
99impl Database {
100    #[must_use]
101    pub fn scheme(&self) -> &str {
102        match self {
103            #[cfg(feature = "postgres")]
104            Database::Postgres(_) => "postgres",
105            #[cfg(feature = "mysql")]
106            Database::MySQL(_) => "mysql",
107        }
108    }
109}
110
111#[derive(Debug, Deserialize, Clone)]
112pub struct StorageAws {
113    pub access_key_id: String,
114    pub secret_access_key: String,
115    pub bucket: String,
116    pub region: String,
117    pub description: Option<String>,
118}
119
120#[derive(Debug, Deserialize, Clone)]
121pub struct StorageGcp {
122    pub service_account_key: String,
123    pub bucket: String,
124    pub description: Option<String>,
125}
126
127#[derive(Debug, Deserialize, Clone)]
128pub struct StorageAzure {
129    pub account_name: String,
130    pub access_key: String,
131    pub container: String,
132    pub description: Option<String>,
133}
134
135#[cfg(feature = "webdav")]
136#[derive(Debug, Deserialize, Clone)]
137pub struct StorageHttp {
138    pub url: String,
139    pub user: Option<String>,
140    pub password: Option<String>,
141    pub description: Option<String>,
142}
143
144#[derive(Debug, Deserialize, Clone)]
145#[serde(tag = "type", rename_all = "lowercase")]
146pub enum Storage {
147    Aws(StorageAws),
148    Gcp(StorageGcp),
149    Azure(StorageAzure),
150    #[cfg(feature = "webdav")]
151    Webdav(StorageHttp),
152}
153
154#[derive(Deserialize, Clone)]
155pub struct Settings {
156    pub server: Server,
157    pub session: Session,
158    pub log: Log,
159    #[cfg(any(feature = "postgres", feature = "mysql"))]
160    pub databases: Option<Vec<Database>>,
161    pub storages: Option<Vec<Storage>>,
162    #[cfg(any(feature = "postgres", feature = "mysql"))]
163    #[serde(skip)]
164    pub database_pool_manager: database_manager::DatabaseManager,
165    #[serde(skip)]
166    pub object_store_manager: credential_manager::ObjectStoreManager,
167}
168
169pub static LAZY_SETTINGS: OnceCell<Settings> = OnceCell::new();
170
171impl Settings {
172    /// ## Errors
173    /// Can not convert from `Path` to UTF-8 string.
174    pub fn new_with_file(config_file: &Path) -> Result<Self, ConfigError> {
175        Self::defaults()
176            .add_source(File::with_name(config_file.to_str().ok_or(
177                ConfigError::Message(format!("Broken utf-8 file name: {}", config_file.display())),
178            )?))
179            .build()?
180            .try_deserialize()
181    }
182
183    /// ## Errors
184    /// Can not creates configuration variables.
185    pub fn new() -> Result<Self, ConfigError> {
186        Self::defaults().build()?.try_deserialize()
187    }
188
189    fn defaults() -> ConfigBuilder<DefaultState> {
190        Config::builder()
191            .set_default("server.address", "0.0.0.0")
192            .unwrap()
193            .set_default("server.port", 4000)
194            .unwrap()
195            .set_default("server.flight_address", "0.0.0.0")
196            .unwrap()
197            .set_default("server.flight_grpc_port", 50051)
198            .unwrap()
199            .set_default("server.metrics_address", "127.0.0.1")
200            .unwrap()
201            .set_default("server.metrics_port", 9100)
202            .unwrap()
203            .set_default("server.base_url", "/")
204            .unwrap()
205            .set_default("server.data_dir", "data")
206            .unwrap()
207            .set_default("server.plugin_dir", "plugin")
208            .unwrap()
209            .set_default("server.disable_stateful_features", false)
210            .unwrap()
211            .set_default("session.default_keep_alive", 3600)
212            .unwrap()
213            .set_default("session.upload_limit_size", 20) // 20MB
214            .unwrap()
215            .set_default("log.level", "info")
216            .unwrap()
217    }
218
219    /// ## Errors
220    /// Can not initialize object store credentials and external database connection pools.
221    pub fn init_global_managers(mut self) -> Result<Self, ConfigError> {
222        #[cfg(any(feature = "postgres", feature = "mysql"))]
223        {
224            self.database_pool_manager =
225                database_manager::DatabaseManager::new_with_config(self.databases.as_ref())
226                    .map_err(|e| {
227                        ConfigError::Message(format!(
228                            "Can not initialize database connection pools: {e}"
229                        ))
230                    })?;
231        }
232
233        self.object_store_manager =
234            credential_manager::ObjectStoreManager::new_with_config(self.storages.as_ref())
235                .map_err(|e| {
236                    ConfigError::Message(format!(
237                        "Can not initialize object store credentials: {e}"
238                    ))
239                })?;
240
241        Ok(self)
242    }
243
244    /// ## Panics
245    /// Configuration variables has not been initialized.
246    pub fn global() -> &'static Settings {
247        LAZY_SETTINGS.get().expect("Settings is not initialized")
248    }
249
250    #[must_use]
251    pub fn debug(&self) -> String {
252        let mut result = format!("{:?}, {:?}, {:?}", self.server, self.session, self.log);
253
254        #[cfg(any(feature = "postgres", feature = "mysql"))]
255        {
256            let databases: Vec<_> = self
257                .database_pool_manager
258                .resolvers
259                .keys()
260                .cloned()
261                .collect();
262            result = format!("{result}, Database {{ namespaces: {databases:?} }}");
263        }
264
265        let stores: Vec<_> = self.object_store_manager.stores.keys().cloned().collect();
266        result = format!("{result}, Storage {{ stores: {stores:?} }}");
267
268        result
269    }
270}