apollo_framework/config.rs
1//! Contains the system configuration.
2//!
3//! Provides access to the system configuration which is loaded from the **config/settings.yml** file.
4//! Note that we observe this file for changes and reload it once a change is detected. Therefore
5//! each user of the config should attach itself to the [Config::notifier](Config::notifier) and
6//! re-process the config once a change message is received.
7//!
8//! Being an in-memory database framework we want to prevent restarts / downtimes as much as
9//! possible.
10//!
11//! Note that the **Config** struct is kind of constant and can be obtained from the **Platform**
12//! once and then kept around. However, when using **Config::current()** to obtain the current
13//! config handle, this should not be stored, as it will not be updated once a new config has
14//! been loaded.
15//!
16//! # Examples
17//!
18//! Obtaining and reading the config:
19//! ```
20//! # use apollo_framework::config;
21//! # use apollo_framework::config::Config;
22//! # use apollo_framework::platform::Platform;
23//! # #[tokio::main]
24//! # async fn main() {
25//!
26//! // Create a platform and install the config...
27//! let platform = Platform::new();
28//! let config = config::install(platform.clone(), false).await;
29//!
30//! // Fetch the current config document (might be reloaded from time to time) and read the
31//! // selected server port..
32//! let port = config.current().config()["server"]["port"].as_i64().unwrap_or(1234);
33//! # }
34//! ```
35//!
36//! Attaching a change listener:
37//! ```no_run
38//! # use apollo_framework::config;
39//! # use apollo_framework::config::Config;
40//! # use apollo_framework::platform::Platform;
41//! # #[tokio::main]
42//! # async fn main() {
43//!
44//! // Create a platform and install the config...
45//! let platform = Platform::new();
46//! let config = config::install(platform.clone(), false);
47//!
48//! // Obtain the config...
49//! let config = platform.require::<Config>();
50//! tokio::spawn(async move {
51//! loop {
52//! // Wait for a config change. This will most probably be combined with a read from
53//! // a command queue or another feature using tokio::select!...
54//! match config.notifier().recv().await {
55//! Ok(_) => log::info!("Config update received..."),
56//! _ => return,
57//! }
58//! }
59//! });
60//! # }
61//! ```
62use std::sync::Arc;
63use std::time::SystemTime;
64
65use arc_swap::ArcSwap;
66use yaml_rust::{Yaml, YamlLoader};
67
68use crate::platform::Platform;
69use anyhow::Context;
70use std::path::Path;
71
72/// Provides access to the system configuration.
73///
74/// Most probably a config instance is installed by the [Builder](crate::builder::Builder) and
75/// can be obtained via `platform.require::<Config>()`. Note that it is highly recommended to
76/// register a change listener by calling `Config::notifier()` as we expect all components to pick
77/// up config changes without restarting the application.
78pub struct Config {
79 filename: String,
80 tx: tokio::sync::broadcast::Sender<()>,
81 config: ArcSwap<(Yaml, Option<SystemTime>)>,
82}
83
84/// Represents the change listener.
85///
86/// Internally this is simply the receiver of a broadcast. The actual message being broadcast
87/// can an should be ignored. All that matters is, once a message has been received, the config
88/// was changed and needs to be re-processed.
89pub type ChangeNotifier = tokio::sync::broadcast::Receiver<()>;
90
91/// Represents a handle to the currently loaded configuration.
92///
93/// Note that this handle should not be stored or kept around for long, as it will not be updated
94/// if the underlying config changed.
95pub struct Handle {
96 config: Arc<(Yaml, Option<SystemTime>)>,
97}
98
99impl Config {
100 /// Creates a new config reading the given file.
101 ///
102 /// Note that this will not install a change listener. This is only done by the
103 /// [install](install) function.
104 pub fn new(file: &str) -> Self {
105 let (tx, _) = tokio::sync::broadcast::channel(1);
106 Config {
107 filename: file.to_owned(),
108 config: ArcSwap::new(Arc::new((Yaml::Null, None))),
109 tx,
110 }
111 }
112
113 /// Obtains a change notifier which receives a message once the config changed.
114 pub fn notifier(&self) -> ChangeNotifier {
115 self.tx.subscribe()
116 }
117
118 /// Obtains a handle to the currently loaded configuration.
119 ///
120 /// Note that this is a fairly efficient operation but still provides some overhead. Therefore
121 /// this shouldn't be placed in an inner loop.
122 pub fn current(&self) -> Handle {
123 Handle {
124 config: self.config.load_full(),
125 }
126 }
127
128 /// Determines the last modified date of the config file on disk.
129 ///
130 /// As within docker, the file is presented as volume, we check that it is a file, as an
131 /// unmounted docker volume is always presented as directory.
132 async fn last_modified(&self) -> Option<SystemTime> {
133 tokio::fs::metadata(&self.filename)
134 .await
135 .ok()
136 .filter(|meta| meta.is_file())
137 .and_then(|meta| meta.modified().ok())
138 }
139
140 /// Forces the config to read the underlying file.
141 ///
142 /// Note that this is normally called by the framework and should not be invoked manually.
143 pub async fn load(&self) -> anyhow::Result<()> {
144 log::info!("Loading config file {}...", &self.filename);
145
146 if let Ok(metadata) = tokio::fs::metadata(&self.filename).await {
147 if !metadata.is_file() {
148 log::info!("Config file doesn't exist or is an unmounted docker volume - skipping config load.");
149 return Ok(());
150 }
151 }
152
153 let config_data = match tokio::fs::read_to_string(&self.filename).await {
154 Ok(data) => data,
155 Err(error) => {
156 return Err(anyhow::anyhow!(
157 "Cannot load config file {}: {}",
158 &self.filename,
159 error
160 ));
161 }
162 };
163
164 let last_modified = tokio::fs::metadata(&self.filename)
165 .await
166 .ok()
167 .and_then(|metadata| metadata.modified().ok());
168
169 self.load_from_string(config_data.as_str(), last_modified)
170 }
171
172 /// Loads a configuration from the given string instead of a file.
173 ///
174 /// This is intended to be used in test environments where we cannot / do not want to load
175 /// a config file from disk.
176 ///
177 /// # Example
178 ///
179 /// ```
180 /// # use apollo_framework::config::Config;
181 /// # use std::time::Instant;
182 /// #
183 /// # #[tokio::main]
184 /// # async fn main() {
185 /// let config = Config::new("apollo_test_config.yml");
186 ///
187 /// // Remove any left over file...
188 /// std::fs::remove_file("apollo_test_config.yml");
189 ///
190 /// // Write a config file...
191 /// assert_eq!(config.store("
192 /// server:
193 /// port: 12345
194 /// ").await.is_ok(), true);
195 ///
196 /// // Load it back and verify its contents (in a fully running Apollo, this would
197 /// // happen automatically via the config watcher...)
198 /// assert_eq!(config.load().await.is_ok(), true);
199 /// assert_eq!(config.current().config()["server"]["port"].as_i64().unwrap(), 12345);
200 ///
201 /// // Writing an invalid config file is prevented...
202 /// assert_eq!(config.store("server: \"test").await.is_err(), true);
203 ///
204 /// // Therefore the original config is still present...
205 /// assert_eq!(config.load().await.is_ok(), true);
206 /// assert_eq!(config.current().config()["server"]["port"].as_i64().unwrap(), 12345);
207 /// # }
208 /// ```
209 pub async fn store(&self, config: &str) -> anyhow::Result<()> {
210 log::info!(
211 "Programmatically updating the config file {}...",
212 &self.filename
213 );
214
215 if let Err(error) = YamlLoader::load_from_str(config) {
216 Err(anyhow::anyhow!("Cannot parse config data: {}", error))
217 } else {
218 tokio::fs::write(&self.filename, config)
219 .await
220 .context("Failed to write to config file!")?;
221 log::info!("Config has been updated successfully!");
222
223 // We have nothing let to do, as we will automatically detect the change and reload the
224 // config...
225 Ok(())
226 }
227 }
228
229 /// Loads a configuration from the given string instead of a file.
230 ///
231 /// This is intended to be used in test environments where we cannot / do not want to load
232 /// a config file from disk.
233 ///
234 /// # Example
235 ///
236 /// ```
237 /// # use apollo_framework::config::Config;
238 /// use std::time::Instant;
239 /// let config = Config::new("somefile.yml");
240 /// config.load_from_string("
241 /// server:
242 /// port: 12345
243 /// ", None);
244 ///
245 /// assert_eq!(config.current().config()["server"]["port"].as_i64().unwrap(), 12345);
246 /// ```
247 pub fn load_from_string(
248 &self,
249 data: &str,
250 last_modified: Option<SystemTime>,
251 ) -> anyhow::Result<()> {
252 let docs = match YamlLoader::load_from_str(data) {
253 Ok(docs) => docs,
254 Err(error) => {
255 return Err(anyhow::anyhow!(
256 "Cannot parse config file {}: {}",
257 &self.filename,
258 error
259 ));
260 }
261 };
262
263 // Store update config...
264 self.config.store(Arc::new((
265 docs.get(0).unwrap_or(&Yaml::Null).clone(),
266 last_modified,
267 )));
268
269 // Notify all listeners - we ignore if there are none...
270 let _ = self.tx.clone().send(());
271
272 Ok(())
273 }
274}
275
276impl Handle {
277 /// Provides access to the currently loaded configuration.
278 pub fn config(&self) -> &Yaml {
279 &self.config.0
280 }
281}
282
283/// Creates an installs a **Config** for the given platform.
284///
285/// This will read its contents from **config/settings.yml** and also install a change listener for
286/// this file. Note that this listener will only watch the "last modified" date of the file and will
287/// not perform a structural comparison. Therefore it is the duty of each config user to gracefully
288/// handle partial config changes.
289///
290/// Note that this method is also called by the [Builder](crate::builder::Builder) unless the
291/// **Config** part is disabled.
292pub async fn install(platform: Arc<Platform>, auto_reload: bool) -> Arc<Config> {
293 // Create the "config" directory in case it doesn't exist...
294 let path = Path::new("config").to_path_buf();
295 if let Err(error) = tokio::fs::create_dir_all(path.clone()).await {
296 log::warn!(
297 "Failed to create config base directory {}: {}",
298 path.to_string_lossy(),
299 error
300 )
301 }
302 // Install a config instance and point it to "settings.yml"..
303 let config = Arc::new(Config::new("config/settings.yml"));
304 platform.register::<Config>(config.clone());
305
306 // Actually try to read the file...
307 if let Err(error) = config.load().await {
308 log::error!("{}", error);
309 }
310
311 // Install a change listener which runs every 2s...
312 if auto_reload {
313 run_config_change_monitor(platform, config.clone());
314 }
315
316 config
317}
318
319fn run_config_change_monitor(platform: Arc<Platform>, config: Arc<Config>) {
320 let _ = tokio::spawn(async move {
321 while platform.is_running() {
322 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
323 // This will contain the last modified date of the file on disk or be None if the
324 // file is absent...
325 let last_modified = config.last_modified().await;
326
327 // Contains the timestamp when the file was loaded the last time or be None if no
328 // data has been loaded yet...
329 let last_loaded = config.config.load().1;
330
331 // If a file is present and newer that the one previously loaded (or if none has been
332 // loaded so far) -> perform a reload and broadcast an update if the file has been
333 // successfully loaded...
334 if last_modified.is_some() && (last_loaded.is_none() || last_modified > last_loaded) {
335 match config.load().await {
336 Ok(_) => {
337 log::info!("System configuration was re-loaded.");
338 }
339 Err(error) => log::error!("Failed to re-load system config: {}", error),
340 }
341 }
342 }
343 });
344}
345
346#[cfg(test)]
347mod tests {
348 use crate::platform::Platform;
349 use std::time::SystemTime;
350
351 #[test]
352 fn ensure_config_update_works() {
353 crate::testing::test_async(async {
354 let platform = Platform::new();
355 let config = crate::config::install(platform.clone(), false).await;
356
357 // Load an initial config...
358 config
359 .load_from_string("test: 42", Some(SystemTime::now()))
360 .unwrap();
361
362 // Setup a task which notifies our oneshot channel once the config changes...
363 let mut change_notifier = config.notifier();
364 let (tx, rx) = tokio::sync::oneshot::channel();
365 let _ = tokio::spawn(async move {
366 match change_notifier.recv().await {
367 Ok(_) => tx.send(()).unwrap(),
368 _ => return,
369 };
370 });
371
372 // Ensure that initial config is still present...
373 assert_eq!(config.current().config()["test"].as_i64().unwrap(), 42);
374
375 // Ensure that a malformed config is simply ignored...
376 assert_eq!(
377 config
378 .load_from_string("test: 'invalid", Some(SystemTime::now()))
379 .is_err(),
380 true
381 );
382
383 // Ensure that initial config is still present...
384 assert_eq!(config.current().config()["test"].as_i64().unwrap(), 42);
385
386 // Change the config...
387 config
388 .load_from_string("test: 4242", Some(SystemTime::now()))
389 .unwrap();
390
391 // Await oneshot message
392 match rx.await {
393 Ok(()) => (),
394 _ => panic!("Received invalid value..."),
395 };
396
397 // Ensure that new config is now present...
398 assert_eq!(config.current().config()["test"].as_i64().unwrap(), 4242);
399 });
400 }
401}