1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
use crate::sockets::create_unix_socket;
use crate::tracing::ReloadHandle;
use ntp_proto::{NtpDuration, StepThreshold};
use std::os::unix::fs::PermissionsExt;
use tokio::task::JoinHandle;
use tracing::warn;
use tracing_subscriber::EnvFilter;

use clap::Args;
use serde::{Deserialize, Serialize};

use super::{CombinedSystemConfig, ConfigureConfig};

fn parse_env_filter(input: &str) -> Result<String, tracing_subscriber::filter::ParseError> {
    // run the parser to error on any invalid input
    let _ = EnvFilter::builder().with_regex(false).parse(input)?;

    // but we actually send `String` over, because it is (De)Serialize
    Ok(input.to_string())
}

#[derive(Debug, Clone, PartialEq, Args, Serialize, Deserialize)]
pub struct ConfigUpdate {
    /// Change the log filter
    #[arg(long, value_parser = parse_env_filter)]
    pub log_filter: Option<String>,

    /// The maximum duration in seconds the system clock is allowed to change in a single jump
    /// before we conclude something is seriously wrong. This is used to limit
    /// the changes to the clock to reasonable ammounts, and stop issues with
    /// remote servers from causing us to drift too far.
    ///
    /// Note that this is not used during startup. To limit system clock changes
    /// during startup, use startup_panic_threshold
    #[arg(long)]
    pub panic_threshold: Option<f64>,
}

// Deal with reloading not being possible during testing.
pub trait LogReloader {
    fn update_log(&self, f: EnvFilter);
}

impl LogReloader for ReloadHandle {
    fn update_log(&self, f: EnvFilter) {
        self.modify(|l| *l.filter_mut() = f).unwrap();
    }
}

pub async fn spawn<H: LogReloader + Send + 'static>(
    config: ConfigureConfig,
    system_config_sender: tokio::sync::watch::Sender<CombinedSystemConfig>,
    log_reload_handle: H,
) -> JoinHandle<std::io::Result<()>> {
    tokio::spawn(async move {
        let result = dynamic_configuration(config, system_config_sender, log_reload_handle).await;
        if let Err(ref e) = result {
            warn!("Abnormal termination of dynamic configurator: {}", e);
            warn!("The dynamic configurator will not be available");
        }
        result
    })
}

async fn dynamic_configuration<H: LogReloader>(
    config: ConfigureConfig,
    system_config_sender: tokio::sync::watch::Sender<CombinedSystemConfig>,
    log_reload_handle: H,
) -> std::io::Result<()> {
    let path = match config.path {
        Some(path) => path,
        None => return Ok(()),
    };

    let peers_listener = create_unix_socket(&path)?;

    // this binary needs to run as root to be able to adjust the system clock.
    // by default, the socket inherits root permissions, but the client should not need
    // elevated permissions to read from the socket. So we explicitly set the permissions
    let permissions: std::fs::Permissions = PermissionsExt::from_mode(config.mode);
    std::fs::set_permissions(&path, permissions)?;

    let mut msg = Vec::with_capacity(16 * 1024);

    loop {
        let (mut stream, _addr) = peers_listener.accept().await?;

        let operation: ConfigUpdate = match crate::sockets::read_json(&mut stream, &mut msg).await {
            Ok(x) => x,
            Err(e) => {
                tracing::error!("could not parse data on socket: {:?}", e);
                continue;
            }
        };

        tracing::info!(?operation, "dynamic config update");

        if let Some(filter) = operation.log_filter {
            log_reload_handle.update_log(EnvFilter::new(filter));
        }

        if let Some(panic_threshold) = operation.panic_threshold {
            system_config_sender.send_modify(|config| {
                config.system.panic_threshold = StepThreshold {
                    forward: Some(NtpDuration::from_seconds(panic_threshold)),
                    backward: Some(NtpDuration::from_seconds(panic_threshold)),
                };
            });
        }
    }
}

#[cfg(test)]
mod tests {
    use std::time::Duration;

    use crate::sockets::write_json;

    use super::*;

    struct TestLogReloader {}
    impl LogReloader for TestLogReloader {
        fn update_log(&self, _f: EnvFilter) {}
    }

    #[tokio::test]
    async fn test_dynamic_configuration_change() {
        let (system_config_sender, system_config_receiver) =
            tokio::sync::watch::channel(CombinedSystemConfig::default());

        let path = std::env::temp_dir().join("ntp-test-stream-4");
        let config = ConfigureConfig {
            path: Some(path.clone()),
            mode: 0o700,
        };

        let handle = spawn(config, system_config_sender, TestLogReloader {}).await;

        // Ensure client has started.
        tokio::time::sleep(Duration::from_millis(10)).await;

        let mut stream = tokio::net::UnixStream::connect(path).await.unwrap();

        write_json(
            &mut stream,
            &ConfigUpdate {
                log_filter: Some("info".into()),
                panic_threshold: Some(600.),
            },
        )
        .await
        .unwrap();

        // Ensure message is handled.
        tokio::time::sleep(Duration::from_millis(10)).await;

        assert_eq!(
            system_config_receiver
                .borrow()
                .system
                .panic_threshold
                .forward,
            Some(NtpDuration::from_seconds(600.))
        );

        handle.abort();
    }
}