apollo_framework/
config.rs

1//! Contains the system configuration.
2//!
3//! Provides access to the system configuration which is loaded from the **config/settings.yml** file.
4//! Note that we observe this file for changes and reload it once a change is detected. Therefore
5//! each user of the config should attach itself to the [Config::notifier](Config::notifier) and
6//! re-process the config once a change message is received.
7//!
8//! Being an in-memory database framework we want to prevent restarts / downtimes as much as
9//! possible.
10//!
11//! Note that the **Config** struct is kind of constant and can be obtained from the **Platform**
12//! once and then kept around. However, when using **Config::current()** to obtain the current
13//! config handle, this should not be stored, as it will not be updated once a new config has
14//! been loaded.
15//!
16//! # Examples
17//!
18//! Obtaining and reading the config:
19//! ```
20//! # use apollo_framework::config;
21//! # use apollo_framework::config::Config;
22//! # use apollo_framework::platform::Platform;
23//! # #[tokio::main]
24//! # async fn main() {
25//!
26//! // Create a platform and install the config...
27//! let platform = Platform::new();
28//! let config = config::install(platform.clone(), false).await;
29//!
30//! // Fetch the current config document (might be reloaded from time to time) and read the
31//! // selected server port..
32//! let port = config.current().config()["server"]["port"].as_i64().unwrap_or(1234);
33//! # }
34//! ```
35//!
36//! Attaching a change listener:
37//! ```no_run
38//! # use apollo_framework::config;
39//! # use apollo_framework::config::Config;
40//! # use apollo_framework::platform::Platform;
41//! # #[tokio::main]
42//! # async fn main() {
43//!
44//! // Create a platform and install the config...
45//! let platform = Platform::new();
46//! let config = config::install(platform.clone(), false);
47//!
48//! // Obtain the config...
49//! let config = platform.require::<Config>();
50//! tokio::spawn(async move {
51//!     loop {
52//!         // Wait for a config change. This will most probably be combined with a read from
53//!         // a command queue or another feature using tokio::select!...
54//!         match config.notifier().recv().await {
55//!             Ok(_) => log::info!("Config update received..."),
56//!             _ => return,
57//!         }       
58//!     }
59//! });
60//! # }
61//! ```
62use std::sync::Arc;
63use std::time::SystemTime;
64
65use arc_swap::ArcSwap;
66use yaml_rust::{Yaml, YamlLoader};
67
68use crate::platform::Platform;
69use anyhow::Context;
70use std::path::Path;
71
72/// Provides access to the system configuration.
73///
74/// Most probably a config instance is installed by the [Builder](crate::builder::Builder) and
75/// can be obtained via `platform.require::<Config>()`. Note that it is highly recommended to
76/// register a change listener by calling `Config::notifier()` as we expect all components to pick
77/// up config changes without restarting the application.
78pub struct Config {
79    filename: String,
80    tx: tokio::sync::broadcast::Sender<()>,
81    config: ArcSwap<(Yaml, Option<SystemTime>)>,
82}
83
84/// Represents the change listener.
85///
86/// Internally this is simply the receiver of a broadcast. The actual message being broadcast
87/// can an should be ignored. All that matters is, once a message has been received, the config
88/// was changed and needs to be re-processed.
89pub type ChangeNotifier = tokio::sync::broadcast::Receiver<()>;
90
91/// Represents a handle to the currently loaded configuration.
92///
93/// Note that this handle should not be stored or kept around for long, as it will not be updated
94/// if the underlying config changed.
95pub struct Handle {
96    config: Arc<(Yaml, Option<SystemTime>)>,
97}
98
99impl Config {
100    /// Creates a new config reading the given file.
101    ///
102    /// Note that this will not install a change listener. This is only done by the
103    /// [install](install) function.
104    pub fn new(file: &str) -> Self {
105        let (tx, _) = tokio::sync::broadcast::channel(1);
106        Config {
107            filename: file.to_owned(),
108            config: ArcSwap::new(Arc::new((Yaml::Null, None))),
109            tx,
110        }
111    }
112
113    /// Obtains a change notifier which receives a message once the config changed.
114    pub fn notifier(&self) -> ChangeNotifier {
115        self.tx.subscribe()
116    }
117
118    /// Obtains a handle to the currently loaded configuration.
119    ///
120    /// Note that this is a fairly efficient operation but still provides some overhead. Therefore
121    /// this shouldn't be placed in an inner loop.
122    pub fn current(&self) -> Handle {
123        Handle {
124            config: self.config.load_full(),
125        }
126    }
127
128    /// Determines the last modified date of the config file on disk.
129    ///
130    /// As within docker, the file is presented as volume, we check that it is a file, as an
131    /// unmounted docker volume is always presented as directory.
132    async fn last_modified(&self) -> Option<SystemTime> {
133        tokio::fs::metadata(&self.filename)
134            .await
135            .ok()
136            .filter(|meta| meta.is_file())
137            .and_then(|meta| meta.modified().ok())
138    }
139
140    /// Forces the config to read the underlying file.
141    ///
142    /// Note that this is normally called by the framework and should not be invoked manually.
143    pub async fn load(&self) -> anyhow::Result<()> {
144        log::info!("Loading config file {}...", &self.filename);
145
146        if let Ok(metadata) = tokio::fs::metadata(&self.filename).await {
147            if !metadata.is_file() {
148                log::info!("Config file doesn't exist or is an unmounted docker volume - skipping config load.");
149                return Ok(());
150            }
151        }
152
153        let config_data = match tokio::fs::read_to_string(&self.filename).await {
154            Ok(data) => data,
155            Err(error) => {
156                return Err(anyhow::anyhow!(
157                    "Cannot load config file {}: {}",
158                    &self.filename,
159                    error
160                ));
161            }
162        };
163
164        let last_modified = tokio::fs::metadata(&self.filename)
165            .await
166            .ok()
167            .and_then(|metadata| metadata.modified().ok());
168
169        self.load_from_string(config_data.as_str(), last_modified)
170    }
171
172    /// Loads a configuration from the given string instead of a file.
173    ///
174    /// This is intended to be used in test environments where we cannot / do not want to load
175    /// a config file from disk.
176    ///
177    /// # Example
178    ///
179    /// ```
180    /// # use apollo_framework::config::Config;
181    /// # use std::time::Instant;
182    /// #
183    /// # #[tokio::main]
184    /// # async fn main() {
185    /// let config = Config::new("apollo_test_config.yml");
186    ///
187    /// // Remove any left over file...
188    /// std::fs::remove_file("apollo_test_config.yml");
189    ///
190    /// // Write a config file...
191    /// assert_eq!(config.store("
192    /// server:
193    ///     port: 12345
194    /// ").await.is_ok(), true);
195    ///
196    /// // Load it back and verify its contents (in a fully running Apollo, this would
197    /// // happen automatically via the config watcher...)
198    /// assert_eq!(config.load().await.is_ok(), true);
199    /// assert_eq!(config.current().config()["server"]["port"].as_i64().unwrap(), 12345);
200    ///
201    /// // Writing an invalid config file is prevented...
202    /// assert_eq!(config.store("server: \"test").await.is_err(), true);
203    ///
204    /// // Therefore the original config is still present...
205    /// assert_eq!(config.load().await.is_ok(), true);
206    /// assert_eq!(config.current().config()["server"]["port"].as_i64().unwrap(), 12345);
207    /// # }
208    /// ```
209    pub async fn store(&self, config: &str) -> anyhow::Result<()> {
210        log::info!(
211            "Programmatically updating the config file {}...",
212            &self.filename
213        );
214
215        if let Err(error) = YamlLoader::load_from_str(config) {
216            Err(anyhow::anyhow!("Cannot parse config data: {}", error))
217        } else {
218            tokio::fs::write(&self.filename, config)
219                .await
220                .context("Failed to write to config file!")?;
221            log::info!("Config has been updated successfully!");
222
223            // We have nothing let to do, as we will automatically detect the change and reload the
224            // config...
225            Ok(())
226        }
227    }
228
229    /// Loads a configuration from the given string instead of a file.
230    ///
231    /// This is intended to be used in test environments where we cannot / do not want to load
232    /// a config file from disk.
233    ///
234    /// # Example
235    ///
236    /// ```
237    /// # use apollo_framework::config::Config;
238    /// use std::time::Instant;
239    /// let config = Config::new("somefile.yml");
240    /// config.load_from_string("
241    /// server:
242    ///     port: 12345
243    /// ", None);
244    ///
245    /// assert_eq!(config.current().config()["server"]["port"].as_i64().unwrap(), 12345);
246    /// ```
247    pub fn load_from_string(
248        &self,
249        data: &str,
250        last_modified: Option<SystemTime>,
251    ) -> anyhow::Result<()> {
252        let docs = match YamlLoader::load_from_str(data) {
253            Ok(docs) => docs,
254            Err(error) => {
255                return Err(anyhow::anyhow!(
256                    "Cannot parse config file {}: {}",
257                    &self.filename,
258                    error
259                ));
260            }
261        };
262
263        // Store update config...
264        self.config.store(Arc::new((
265            docs.get(0).unwrap_or(&Yaml::Null).clone(),
266            last_modified,
267        )));
268
269        // Notify all listeners - we ignore if there are none...
270        let _ = self.tx.clone().send(());
271
272        Ok(())
273    }
274}
275
276impl Handle {
277    /// Provides access to the currently loaded configuration.
278    pub fn config(&self) -> &Yaml {
279        &self.config.0
280    }
281}
282
283/// Creates an installs a **Config** for the given platform.
284///
285/// This will read its contents from **config/settings.yml** and also install a change listener for
286/// this file. Note that this listener will only watch the "last modified" date of the file and will
287/// not perform a structural comparison. Therefore it is the duty of each config user to gracefully
288/// handle partial config changes.
289///
290/// Note that this method is also called by the [Builder](crate::builder::Builder) unless the
291/// **Config** part is disabled.
292pub async fn install(platform: Arc<Platform>, auto_reload: bool) -> Arc<Config> {
293    // Create the "config" directory in case it doesn't exist...
294    let path = Path::new("config").to_path_buf();
295    if let Err(error) = tokio::fs::create_dir_all(path.clone()).await {
296        log::warn!(
297            "Failed to create config base directory {}: {}",
298            path.to_string_lossy(),
299            error
300        )
301    }
302    // Install a config instance and point it to "settings.yml"..
303    let config = Arc::new(Config::new("config/settings.yml"));
304    platform.register::<Config>(config.clone());
305
306    // Actually try to read the file...
307    if let Err(error) = config.load().await {
308        log::error!("{}", error);
309    }
310
311    // Install a change listener which runs every 2s...
312    if auto_reload {
313        run_config_change_monitor(platform, config.clone());
314    }
315
316    config
317}
318
319fn run_config_change_monitor(platform: Arc<Platform>, config: Arc<Config>) {
320    let _ = tokio::spawn(async move {
321        while platform.is_running() {
322            tokio::time::sleep(std::time::Duration::from_secs(2)).await;
323            // This will contain the last modified date of the file on disk or be None if the
324            // file is absent...
325            let last_modified = config.last_modified().await;
326
327            // Contains the timestamp when the file was loaded the last time or be None if no
328            // data has been loaded yet...
329            let last_loaded = config.config.load().1;
330
331            // If a file is present and newer that the one previously loaded (or if none has been
332            // loaded so far) -> perform a reload and broadcast an update if the file has been
333            // successfully loaded...
334            if last_modified.is_some() && (last_loaded.is_none() || last_modified > last_loaded) {
335                match config.load().await {
336                    Ok(_) => {
337                        log::info!("System configuration was re-loaded.");
338                    }
339                    Err(error) => log::error!("Failed to re-load system config: {}", error),
340                }
341            }
342        }
343    });
344}
345
346#[cfg(test)]
347mod tests {
348    use crate::platform::Platform;
349    use std::time::SystemTime;
350
351    #[test]
352    fn ensure_config_update_works() {
353        crate::testing::test_async(async {
354            let platform = Platform::new();
355            let config = crate::config::install(platform.clone(), false).await;
356
357            // Load an initial config...
358            config
359                .load_from_string("test: 42", Some(SystemTime::now()))
360                .unwrap();
361
362            // Setup a task which notifies our oneshot channel once the config changes...
363            let mut change_notifier = config.notifier();
364            let (tx, rx) = tokio::sync::oneshot::channel();
365            let _ = tokio::spawn(async move {
366                match change_notifier.recv().await {
367                    Ok(_) => tx.send(()).unwrap(),
368                    _ => return,
369                };
370            });
371
372            // Ensure that initial config is still present...
373            assert_eq!(config.current().config()["test"].as_i64().unwrap(), 42);
374
375            // Ensure that a malformed config is simply ignored...
376            assert_eq!(
377                config
378                    .load_from_string("test: 'invalid", Some(SystemTime::now()))
379                    .is_err(),
380                true
381            );
382
383            // Ensure that initial config is still present...
384            assert_eq!(config.current().config()["test"].as_i64().unwrap(), 42);
385
386            // Change the config...
387            config
388                .load_from_string("test: 4242", Some(SystemTime::now()))
389                .unwrap();
390
391            // Await oneshot message
392            match rx.await {
393                Ok(()) => (),
394                _ => panic!("Received invalid value..."),
395            };
396
397            // Ensure that new config is now present...
398            assert_eq!(config.current().config()["test"].as_i64().unwrap(), 4242);
399        });
400    }
401}