ntp_daemon/config/
dynamic.rs1use crate::sockets::create_unix_socket;
2use crate::tracing::ReloadHandle;
3use ntp_proto::{NtpDuration, StepThreshold};
4use std::os::unix::fs::PermissionsExt;
5use tokio::task::JoinHandle;
6use tracing::warn;
7use tracing_subscriber::EnvFilter;
8
9use clap::Args;
10use serde::{Deserialize, Serialize};
11
12use super::{CombinedSystemConfig, ConfigureConfig};
13
14fn parse_env_filter(input: &str) -> Result<String, tracing_subscriber::filter::ParseError> {
15 let _ = EnvFilter::builder().with_regex(false).parse(input)?;
17
18 Ok(input.to_string())
20}
21
22#[derive(Debug, Clone, PartialEq, Args, Serialize, Deserialize)]
23pub struct ConfigUpdate {
24 #[arg(long, value_parser = parse_env_filter)]
26 pub log_filter: Option<String>,
27
28 #[arg(long)]
36 pub panic_threshold: Option<f64>,
37}
38
39pub trait LogReloader {
41 fn update_log(&self, f: EnvFilter);
42}
43
44impl LogReloader for ReloadHandle {
45 fn update_log(&self, f: EnvFilter) {
46 self.modify(|l| *l.filter_mut() = f).unwrap();
47 }
48}
49
50pub async fn spawn<H: LogReloader + Send + 'static>(
51 config: ConfigureConfig,
52 system_config_sender: tokio::sync::watch::Sender<CombinedSystemConfig>,
53 log_reload_handle: H,
54) -> JoinHandle<std::io::Result<()>> {
55 tokio::spawn(async move {
56 let result = dynamic_configuration(config, system_config_sender, log_reload_handle).await;
57 if let Err(ref e) = result {
58 warn!("Abnormal termination of dynamic configurator: {}", e);
59 warn!("The dynamic configurator will not be available");
60 }
61 result
62 })
63}
64
65async fn dynamic_configuration<H: LogReloader>(
66 config: ConfigureConfig,
67 system_config_sender: tokio::sync::watch::Sender<CombinedSystemConfig>,
68 log_reload_handle: H,
69) -> std::io::Result<()> {
70 let path = match config.path {
71 Some(path) => path,
72 None => return Ok(()),
73 };
74
75 let peers_listener = create_unix_socket(&path)?;
76
77 let permissions: std::fs::Permissions = PermissionsExt::from_mode(config.mode);
81 std::fs::set_permissions(&path, permissions)?;
82
83 let mut msg = Vec::with_capacity(16 * 1024);
84
85 loop {
86 let (mut stream, _addr) = peers_listener.accept().await?;
87
88 let operation: ConfigUpdate = match crate::sockets::read_json(&mut stream, &mut msg).await {
89 Ok(x) => x,
90 Err(e) => {
91 tracing::error!("could not parse data on socket: {:?}", e);
92 continue;
93 }
94 };
95
96 tracing::info!(?operation, "dynamic config update");
97
98 if let Some(filter) = operation.log_filter {
99 log_reload_handle.update_log(EnvFilter::new(filter));
100 }
101
102 if let Some(panic_threshold) = operation.panic_threshold {
103 system_config_sender.send_modify(|config| {
104 config.system.panic_threshold = StepThreshold {
105 forward: Some(NtpDuration::from_seconds(panic_threshold)),
106 backward: Some(NtpDuration::from_seconds(panic_threshold)),
107 };
108 });
109 }
110 }
111}
112
113#[cfg(test)]
114mod tests {
115 use std::time::Duration;
116
117 use crate::sockets::write_json;
118
119 use super::*;
120
121 struct TestLogReloader {}
122 impl LogReloader for TestLogReloader {
123 fn update_log(&self, _f: EnvFilter) {}
124 }
125
126 #[tokio::test]
127 async fn test_dynamic_configuration_change() {
128 let (system_config_sender, system_config_receiver) =
129 tokio::sync::watch::channel(CombinedSystemConfig::default());
130
131 let path = std::env::temp_dir().join("ntp-test-stream-4");
132 let config = ConfigureConfig {
133 path: Some(path.clone()),
134 mode: 0o700,
135 };
136
137 let handle = spawn(config, system_config_sender, TestLogReloader {}).await;
138
139 tokio::time::sleep(Duration::from_millis(10)).await;
141
142 let mut stream = tokio::net::UnixStream::connect(path).await.unwrap();
143
144 write_json(
145 &mut stream,
146 &ConfigUpdate {
147 log_filter: Some("info".into()),
148 panic_threshold: Some(600.),
149 },
150 )
151 .await
152 .unwrap();
153
154 tokio::time::sleep(Duration::from_millis(10)).await;
156
157 assert_eq!(
158 system_config_receiver
159 .borrow()
160 .system
161 .panic_threshold
162 .forward,
163 Some(NtpDuration::from_seconds(600.))
164 );
165
166 handle.abort();
167 }
168}