use std::sync::Arc;
use std::time::SystemTime;
use arc_swap::ArcSwap;
use yaml_rust::{Yaml, YamlLoader};
use crate::platform::Platform;
use crate::spawn;
use anyhow::Context;
use std::path::Path;
pub struct Config {
filename: String,
tx: tokio::sync::broadcast::Sender<()>,
config: ArcSwap<(Yaml, Option<SystemTime>)>,
}
pub type ChangeNotifier = tokio::sync::broadcast::Receiver<()>;
pub struct Handle {
config: Arc<(Yaml, Option<SystemTime>)>,
}
impl Config {
pub fn new(file: &str) -> Self {
let (tx, _) = tokio::sync::broadcast::channel(1);
Config {
filename: file.to_owned(),
config: ArcSwap::new(Arc::new((Yaml::Null, None))),
tx,
}
}
pub fn notifier(&self) -> ChangeNotifier {
self.tx.subscribe()
}
pub fn current(&self) -> Handle {
Handle {
config: self.config.load_full(),
}
}
async fn last_modified(&self) -> Option<SystemTime> {
tokio::fs::metadata(&self.filename)
.await
.ok()
.filter(|meta| meta.is_file())
.and_then(|meta| meta.modified().ok())
}
pub async fn load(&self) -> anyhow::Result<()> {
log::info!("Loading config file {}...", &self.filename);
if let Ok(metadata) = tokio::fs::metadata(&self.filename).await {
if !metadata.is_file() {
log::info!("Config file doesn't exist or is an unmounted docker volume - skipping config load.");
return Ok(());
}
}
let config_data = match tokio::fs::read_to_string(&self.filename).await {
Ok(data) => data,
Err(error) => {
return Err(anyhow::anyhow!(
"Cannot load config file {}: {}",
&self.filename,
error
));
}
};
let last_modified = tokio::fs::metadata(&self.filename)
.await
.ok()
.and_then(|metadata| metadata.modified().ok());
self.load_from_string(config_data.as_str(), last_modified)
}
pub async fn store(&self, config: &str) -> anyhow::Result<()> {
log::info!(
"Programmatically updating the config file {}...",
&self.filename
);
if let Err(error) = YamlLoader::load_from_str(config) {
Err(anyhow::anyhow!("Cannot parse config data: {}", error))
} else {
tokio::fs::write(&self.filename, config)
.await
.context("Failed to write to config file!")?;
log::info!("Config has been updated successfully!");
Ok(())
}
}
pub fn load_from_string(
&self,
data: &str,
last_modified: Option<SystemTime>,
) -> anyhow::Result<()> {
let docs = match YamlLoader::load_from_str(data) {
Ok(docs) => docs,
Err(error) => {
return Err(anyhow::anyhow!(
"Cannot parse config file {}: {}",
&self.filename,
error
));
}
};
self.config.store(Arc::new((
docs.get(0).unwrap_or(&Yaml::Null).clone(),
last_modified,
)));
let _ = self.tx.clone().send(());
Ok(())
}
}
impl Handle {
pub fn config(&self) -> &Yaml {
&self.config.0
}
}
pub async fn install(platform: Arc<Platform>, auto_reload: bool) -> Arc<Config> {
let path = Path::new("config").to_path_buf();
if let Err(error) = tokio::fs::create_dir_all(path.clone()).await {
log::warn!(
"Failed to create config base directory {}: {}",
path.to_string_lossy(),
error
)
}
let config = Arc::new(Config::new("config/settings.yml"));
platform.register::<Config>(config.clone());
if let Err(error) = config.load().await {
log::error!("{}", error);
}
if auto_reload {
run_config_change_monitor(platform, config.clone());
}
config
}
fn run_config_change_monitor(platform: Arc<Platform>, config: Arc<Config>) {
spawn!(async move {
while platform.is_running() {
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let last_modified = config.last_modified().await;
let last_loaded = config.config.load().1;
if last_modified.is_some() && (last_loaded.is_none() || last_modified > last_loaded) {
match config.load().await {
Ok(_) => {
log::info!("System configuration was re-loaded.");
}
Err(error) => log::error!("Failed to re-load system config: {}", error),
}
}
}
});
}
#[cfg(test)]
mod tests {
use crate::platform::Platform;
use std::time::SystemTime;
#[test]
fn ensure_config_update_works() {
crate::testing::test_async(async {
let platform = Platform::new();
let config = crate::config::install(platform.clone(), false).await;
config
.load_from_string("test: 42", Some(SystemTime::now()))
.unwrap();
let mut change_notifier = config.notifier();
let (tx, rx) = tokio::sync::oneshot::channel();
let _ = tokio::spawn(async move {
match change_notifier.recv().await {
Ok(_) => tx.send(()).unwrap(),
_ => (),
};
});
assert_eq!(config.current().config()["test"].as_i64().unwrap(), 42);
assert_eq!(
config
.load_from_string("test: 'invalid", Some(SystemTime::now()))
.is_err(),
true
);
assert_eq!(config.current().config()["test"].as_i64().unwrap(), 42);
config
.load_from_string("test: 4242", Some(SystemTime::now()))
.unwrap();
match rx.await {
Ok(()) => (),
_ => panic!("Received invalid value..."),
};
assert_eq!(config.current().config()["test"].as_i64().unwrap(), 4242);
});
}
}