use crate::sockets::create_unix_socket;
use crate::tracing::ReloadHandle;
use ntp_proto::{NtpDuration, StepThreshold};
use std::os::unix::fs::PermissionsExt;
use tokio::task::JoinHandle;
use tracing::warn;
use tracing_subscriber::EnvFilter;
use clap::Args;
use serde::{Deserialize, Serialize};
use super::{CombinedSystemConfig, ConfigureConfig};
fn parse_env_filter(input: &str) -> Result<String, tracing_subscriber::filter::ParseError> {
let _ = EnvFilter::builder().with_regex(false).parse(input)?;
Ok(input.to_string())
}
#[derive(Debug, Clone, PartialEq, Args, Serialize, Deserialize)]
pub struct ConfigUpdate {
#[arg(long, value_parser = parse_env_filter)]
pub log_filter: Option<String>,
#[arg(long)]
pub panic_threshold: Option<f64>,
}
pub trait LogReloader {
fn update_log(&self, f: EnvFilter);
}
impl LogReloader for ReloadHandle {
fn update_log(&self, f: EnvFilter) {
self.modify(|l| *l.filter_mut() = f).unwrap();
}
}
pub async fn spawn<H: LogReloader + Send + 'static>(
config: ConfigureConfig,
system_config_sender: tokio::sync::watch::Sender<CombinedSystemConfig>,
log_reload_handle: H,
) -> JoinHandle<std::io::Result<()>> {
tokio::spawn(async move {
let result = dynamic_configuration(config, system_config_sender, log_reload_handle).await;
if let Err(ref e) = result {
warn!("Abnormal termination of dynamic configurator: {}", e);
warn!("The dynamic configurator will not be available");
}
result
})
}
async fn dynamic_configuration<H: LogReloader>(
config: ConfigureConfig,
system_config_sender: tokio::sync::watch::Sender<CombinedSystemConfig>,
log_reload_handle: H,
) -> std::io::Result<()> {
let path = match config.path {
Some(path) => path,
None => return Ok(()),
};
let peers_listener = create_unix_socket(&path)?;
let permissions: std::fs::Permissions = PermissionsExt::from_mode(config.mode);
std::fs::set_permissions(&path, permissions)?;
let mut msg = Vec::with_capacity(16 * 1024);
loop {
let (mut stream, _addr) = peers_listener.accept().await?;
let operation: ConfigUpdate = match crate::sockets::read_json(&mut stream, &mut msg).await {
Ok(x) => x,
Err(e) => {
tracing::error!("could not parse data on socket: {:?}", e);
continue;
}
};
tracing::info!(?operation, "dynamic config update");
if let Some(filter) = operation.log_filter {
log_reload_handle.update_log(EnvFilter::new(filter));
}
if let Some(panic_threshold) = operation.panic_threshold {
system_config_sender.send_modify(|config| {
config.system.panic_threshold = StepThreshold {
forward: Some(NtpDuration::from_seconds(panic_threshold)),
backward: Some(NtpDuration::from_seconds(panic_threshold)),
};
});
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use crate::sockets::write_json;
use super::*;
struct TestLogReloader {}
impl LogReloader for TestLogReloader {
fn update_log(&self, _f: EnvFilter) {}
}
#[tokio::test]
async fn test_dynamic_configuration_change() {
let (system_config_sender, system_config_receiver) =
tokio::sync::watch::channel(CombinedSystemConfig::default());
let path = std::env::temp_dir().join("ntp-test-stream-4");
let config = ConfigureConfig {
path: Some(path.clone()),
mode: 0o700,
};
let handle = spawn(config, system_config_sender, TestLogReloader {}).await;
tokio::time::sleep(Duration::from_millis(10)).await;
let mut stream = tokio::net::UnixStream::connect(path).await.unwrap();
write_json(
&mut stream,
&ConfigUpdate {
log_filter: Some("info".into()),
panic_threshold: Some(600.),
},
)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
assert_eq!(
system_config_receiver
.borrow()
.system
.panic_threshold
.forward,
Some(NtpDuration::from_seconds(600.))
);
handle.abort();
}
}