ntp_daemon/config/
dynamic.rs

1use crate::sockets::create_unix_socket;
2use crate::tracing::ReloadHandle;
3use ntp_proto::{NtpDuration, StepThreshold};
4use std::os::unix::fs::PermissionsExt;
5use tokio::task::JoinHandle;
6use tracing::warn;
7use tracing_subscriber::EnvFilter;
8
9use clap::Args;
10use serde::{Deserialize, Serialize};
11
12use super::{CombinedSystemConfig, ConfigureConfig};
13
14fn parse_env_filter(input: &str) -> Result<String, tracing_subscriber::filter::ParseError> {
15    // run the parser to error on any invalid input
16    let _ = EnvFilter::builder().with_regex(false).parse(input)?;
17
18    // but we actually send `String` over, because it is (De)Serialize
19    Ok(input.to_string())
20}
21
22#[derive(Debug, Clone, PartialEq, Args, Serialize, Deserialize)]
23pub struct ConfigUpdate {
24    /// Change the log filter
25    #[arg(long, value_parser = parse_env_filter)]
26    pub log_filter: Option<String>,
27
28    /// The maximum duration in seconds the system clock is allowed to change in a single jump
29    /// before we conclude something is seriously wrong. This is used to limit
30    /// the changes to the clock to reasonable ammounts, and stop issues with
31    /// remote servers from causing us to drift too far.
32    ///
33    /// Note that this is not used during startup. To limit system clock changes
34    /// during startup, use startup_panic_threshold
35    #[arg(long)]
36    pub panic_threshold: Option<f64>,
37}
38
39// Deal with reloading not being possible during testing.
40pub trait LogReloader {
41    fn update_log(&self, f: EnvFilter);
42}
43
44impl LogReloader for ReloadHandle {
45    fn update_log(&self, f: EnvFilter) {
46        self.modify(|l| *l.filter_mut() = f).unwrap();
47    }
48}
49
50pub async fn spawn<H: LogReloader + Send + 'static>(
51    config: ConfigureConfig,
52    system_config_sender: tokio::sync::watch::Sender<CombinedSystemConfig>,
53    log_reload_handle: H,
54) -> JoinHandle<std::io::Result<()>> {
55    tokio::spawn(async move {
56        let result = dynamic_configuration(config, system_config_sender, log_reload_handle).await;
57        if let Err(ref e) = result {
58            warn!("Abnormal termination of dynamic configurator: {}", e);
59            warn!("The dynamic configurator will not be available");
60        }
61        result
62    })
63}
64
65async fn dynamic_configuration<H: LogReloader>(
66    config: ConfigureConfig,
67    system_config_sender: tokio::sync::watch::Sender<CombinedSystemConfig>,
68    log_reload_handle: H,
69) -> std::io::Result<()> {
70    let path = match config.path {
71        Some(path) => path,
72        None => return Ok(()),
73    };
74
75    let peers_listener = create_unix_socket(&path)?;
76
77    // this binary needs to run as root to be able to adjust the system clock.
78    // by default, the socket inherits root permissions, but the client should not need
79    // elevated permissions to read from the socket. So we explicitly set the permissions
80    let permissions: std::fs::Permissions = PermissionsExt::from_mode(config.mode);
81    std::fs::set_permissions(&path, permissions)?;
82
83    let mut msg = Vec::with_capacity(16 * 1024);
84
85    loop {
86        let (mut stream, _addr) = peers_listener.accept().await?;
87
88        let operation: ConfigUpdate = match crate::sockets::read_json(&mut stream, &mut msg).await {
89            Ok(x) => x,
90            Err(e) => {
91                tracing::error!("could not parse data on socket: {:?}", e);
92                continue;
93            }
94        };
95
96        tracing::info!(?operation, "dynamic config update");
97
98        if let Some(filter) = operation.log_filter {
99            log_reload_handle.update_log(EnvFilter::new(filter));
100        }
101
102        if let Some(panic_threshold) = operation.panic_threshold {
103            system_config_sender.send_modify(|config| {
104                config.system.panic_threshold = StepThreshold {
105                    forward: Some(NtpDuration::from_seconds(panic_threshold)),
106                    backward: Some(NtpDuration::from_seconds(panic_threshold)),
107                };
108            });
109        }
110    }
111}
112
113#[cfg(test)]
114mod tests {
115    use std::time::Duration;
116
117    use crate::sockets::write_json;
118
119    use super::*;
120
121    struct TestLogReloader {}
122    impl LogReloader for TestLogReloader {
123        fn update_log(&self, _f: EnvFilter) {}
124    }
125
126    #[tokio::test]
127    async fn test_dynamic_configuration_change() {
128        let (system_config_sender, system_config_receiver) =
129            tokio::sync::watch::channel(CombinedSystemConfig::default());
130
131        let path = std::env::temp_dir().join("ntp-test-stream-4");
132        let config = ConfigureConfig {
133            path: Some(path.clone()),
134            mode: 0o700,
135        };
136
137        let handle = spawn(config, system_config_sender, TestLogReloader {}).await;
138
139        // Ensure client has started.
140        tokio::time::sleep(Duration::from_millis(10)).await;
141
142        let mut stream = tokio::net::UnixStream::connect(path).await.unwrap();
143
144        write_json(
145            &mut stream,
146            &ConfigUpdate {
147                log_filter: Some("info".into()),
148                panic_threshold: Some(600.),
149            },
150        )
151        .await
152        .unwrap();
153
154        // Ensure message is handled.
155        tokio::time::sleep(Duration::from_millis(10)).await;
156
157        assert_eq!(
158            system_config_receiver
159                .borrow()
160                .system
161                .panic_threshold
162                .forward,
163            Some(NtpDuration::from_seconds(600.))
164        );
165
166        handle.abort();
167    }
168}