ntp_ctl/
lib.rs

1//! This crate contains the control interface client for ntpd-rs and is not intended
2//! as a public interface at this time. It follows the same version as the main
3//! ntpd-rs crate, but that version is not intended to give any stability guarantee.
4//! Use at your own risk.
5//!
6//! Please visit the [ntpd-rs](https://github.com/pendulum-project/ntpd-rs) project
7//! for more information.
8#![forbid(unsafe_code)]
9
10use std::{path::PathBuf, process::ExitCode};
11
12use clap::{Parser, Subcommand};
13use ntp_daemon::{Config, ConfigUpdate, ObservableState};
14use ntp_metrics_exporter::Metrics;
15
16#[derive(Parser)]
17#[command(version = "0.2.0", about = "Query and configure the ntpd-rs daemon")]
18#[command(arg_required_else_help(true))]
19struct Cli {
20    #[command(subcommand)]
21    command: Command,
22
23    /// Which configuration file to read the socket paths from
24    #[arg(short, long)]
25    config: Option<PathBuf>,
26
27    /// Path of the observation socket
28    #[arg(short, long)]
29    observation_socket: Option<PathBuf>,
30
31    /// Path of the configuration socket
32    #[arg(short = 's', long)]
33    configuration_socket: Option<PathBuf>,
34}
35
36#[derive(Subcommand)]
37enum Command {
38    #[command(about = "Information about the peers the daemon is currently connected with")]
39    Peers,
40    #[command(about = "Information about the state of the daemon itself")]
41    System,
42    #[command(
43        about = "Information about the state of the daemon and peers in the prometheus export format"
44    )]
45    Prometheus,
46    #[command(about = "Adjust configuration (e.g. loglevel) of the daemon")]
47    Config(ConfigUpdate),
48    #[command(about = "Validate configuration")]
49    Validate,
50}
51
52enum PrintState {
53    Peers,
54    System,
55    Prometheus,
56}
57
58async fn validate(cli: Cli) -> std::io::Result<ExitCode> {
59    // Late completion not needed, so ignore result.
60    let _ = ntp_daemon::tracing::init(
61        tracing_subscriber::EnvFilter::new("info"),
62        Default::default(),
63    );
64    match Config::from_args(cli.config, vec![], vec![]).await {
65        Ok(config) => {
66            if config.check() {
67                eprintln!("Config looks good");
68                Ok(ExitCode::SUCCESS)
69            } else {
70                Ok(ExitCode::FAILURE)
71            }
72        }
73        Err(e) => {
74            eprintln!("Error: Could not load configuration: {e}");
75            Ok(ExitCode::FAILURE)
76        }
77    }
78}
79
80pub async fn main() -> std::io::Result<ExitCode> {
81    let cli = Cli::parse();
82
83    if matches!(cli.command, Command::Validate) {
84        return validate(cli).await;
85    }
86
87    let config = Config::from_args(cli.config, vec![], vec![]).await;
88
89    if let Err(ref e) = config {
90        println!("Warning: Unable to load configuration file: {e}");
91    }
92
93    let config = config.unwrap_or_default();
94
95    let observation = cli
96        .observation_socket
97        .or(config.observe.path)
98        .unwrap_or_else(|| PathBuf::from("/run/ntpd-rs/observe"));
99
100    let configuration = cli
101        .configuration_socket
102        .or(config.configure.path)
103        .unwrap_or_else(|| PathBuf::from("/run/ntpd-rs/configure"));
104
105    match cli.command {
106        Command::Peers => print_state(PrintState::Peers, observation).await,
107        Command::System => print_state(PrintState::System, observation).await,
108        Command::Prometheus => print_state(PrintState::Prometheus, observation).await,
109        Command::Config(config_update) => update_config(configuration, config_update).await,
110        Command::Validate => unreachable!(),
111    }
112}
113
114async fn print_state(
115    print: PrintState,
116    observe_socket: PathBuf,
117) -> Result<ExitCode, std::io::Error> {
118    let mut stream = match tokio::net::UnixStream::connect(&observe_socket).await {
119        Ok(stream) => stream,
120        Err(e) => {
121            eprintln!("Could not open socket at {}: {e}", observe_socket.display(),);
122            return Ok(ExitCode::FAILURE);
123        }
124    };
125
126    let mut msg = Vec::with_capacity(16 * 1024);
127    let output =
128        match ntp_daemon::sockets::read_json::<ObservableState>(&mut stream, &mut msg).await {
129            Ok(output) => output,
130            Err(e) => {
131                eprintln!("Failed to read state from observation socket: {e}");
132
133                return Ok(ExitCode::FAILURE);
134            }
135        };
136
137    match print {
138        PrintState::Peers => {
139            // Unwrap here is fine as our serializer is infallible.
140            println!("{}", serde_json::to_string_pretty(&output.peers).unwrap());
141        }
142        PrintState::System => {
143            // Unwrap here is fine as our serializer is infallible.
144            println!("{}", serde_json::to_string_pretty(&output.system).unwrap());
145        }
146        PrintState::Prometheus => {
147            let metrics = Metrics::default();
148            metrics.fill(&output);
149            let registry = metrics.registry();
150            let mut buf = String::new();
151
152            if let Err(e) = prometheus_client::encoding::text::encode(&mut buf, &registry) {
153                eprintln!("Failed to encode prometheus data: {e}");
154
155                return Ok(ExitCode::FAILURE);
156            }
157
158            println!("{buf}");
159        }
160    }
161
162    Ok(ExitCode::SUCCESS)
163}
164
165async fn update_config(
166    configuration_socket: PathBuf,
167    config_update: ConfigUpdate,
168) -> Result<ExitCode, std::io::Error> {
169    let mut stream = match tokio::net::UnixStream::connect(&configuration_socket).await {
170        Ok(stream) => stream,
171        Err(e) => {
172            eprintln!(
173                "Could not open socket at {}: {e}",
174                configuration_socket.display(),
175            );
176            return Ok(ExitCode::FAILURE);
177        }
178    };
179
180    match ntp_daemon::sockets::write_json(&mut stream, &config_update).await {
181        Ok(_) => Ok(ExitCode::SUCCESS),
182        Err(e) => {
183            eprintln!("Failed to update configuration: {e}");
184
185            Ok(ExitCode::FAILURE)
186        }
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use std::os::unix::prelude::PermissionsExt;
193
194    use ntp_daemon::{
195        config::ObserveConfig,
196        sockets::{create_unix_socket, read_json, write_json},
197    };
198
199    use super::*;
200
201    async fn write_socket_helper(
202        command: PrintState,
203        socket_name: &str,
204    ) -> std::io::Result<Result<ExitCode, std::io::Error>> {
205        let config: ObserveConfig = Default::default();
206
207        // be careful with copying: tests run concurrently and should use a unique socket name!
208        let path = std::env::temp_dir().join(socket_name);
209        if path.exists() {
210            std::fs::remove_file(&path).unwrap();
211        }
212
213        let peers_listener = create_unix_socket(&path)?;
214
215        let permissions: std::fs::Permissions = PermissionsExt::from_mode(config.mode);
216        std::fs::set_permissions(&path, permissions)?;
217
218        let fut = super::print_state(command, path);
219        let handle = tokio::spawn(fut);
220
221        let value = ObservableState {
222            system: Default::default(),
223            peers: vec![],
224            servers: vec![],
225        };
226
227        let (mut stream, _addr) = peers_listener.accept().await?;
228        write_json(&mut stream, &value).await?;
229
230        let result = handle.await.unwrap();
231
232        Ok(result)
233    }
234
235    #[tokio::test]
236    async fn test_control_socket_peer() -> std::io::Result<()> {
237        // be careful with copying: tests run concurrently and should use a unique socket name!
238        let result = write_socket_helper(PrintState::Peers, "ntp-test-stream-6").await?;
239
240        assert_eq!(
241            format!("{:?}", result.unwrap()),
242            format!("{:?}", ExitCode::SUCCESS)
243        );
244
245        Ok(())
246    }
247
248    #[tokio::test]
249    async fn test_control_socket_system() -> std::io::Result<()> {
250        // be careful with copying: tests run concurrently and should use a unique socket name!
251        let result = write_socket_helper(PrintState::System, "ntp-test-stream-7").await?;
252
253        assert_eq!(
254            format!("{:?}", result.unwrap()),
255            format!("{:?}", ExitCode::SUCCESS)
256        );
257
258        Ok(())
259    }
260
261    #[tokio::test]
262    async fn test_control_socket_prometheus() -> std::io::Result<()> {
263        // be careful with copying: tests run concurrently and should use a unique socket name!
264        let result = write_socket_helper(PrintState::Prometheus, "ntp-test-stream-8").await?;
265
266        assert_eq!(
267            format!("{:?}", result.unwrap()),
268            format!("{:?}", ExitCode::SUCCESS)
269        );
270
271        Ok(())
272    }
273
274    #[tokio::test]
275    async fn test_control_socket_config() -> std::io::Result<()> {
276        let config: ObserveConfig = Default::default();
277
278        // be careful with copying: tests run concurrently and should use a unique socket name!
279        let path = std::env::temp_dir().join("ntp-test-stream-9");
280        if path.exists() {
281            std::fs::remove_file(&path).unwrap();
282        }
283
284        let peers_listener = create_unix_socket(&path)?;
285
286        let permissions: std::fs::Permissions = PermissionsExt::from_mode(config.mode);
287        std::fs::set_permissions(&path, permissions)?;
288
289        let update = ConfigUpdate {
290            log_filter: Some("foo".to_string()),
291            panic_threshold: Some(0.123),
292        };
293
294        let fut = super::update_config(path, update.clone());
295        let handle = tokio::spawn(fut);
296
297        let (mut stream, _addr) = peers_listener.accept().await?;
298        let mut msg = Vec::with_capacity(16 * 1024);
299        let actual_update = read_json::<ConfigUpdate>(&mut stream, &mut msg).await?;
300
301        let result = handle.await.unwrap();
302
303        assert_eq!(
304            format!("{:?}", result.unwrap()),
305            format!("{:?}", ExitCode::SUCCESS)
306        );
307
308        assert_eq!(update, actual_update);
309
310        Ok(())
311    }
312
313    #[tokio::test]
314    async fn test_control_socket_peer_invalid_input() -> std::io::Result<()> {
315        let config: ObserveConfig = Default::default();
316
317        // be careful with copying: tests run concurrently and should use a unique socket name!
318        let path = std::env::temp_dir().join("ntp-test-stream-10");
319        if path.exists() {
320            std::fs::remove_file(&path).unwrap();
321        }
322
323        let peers_listener = create_unix_socket(&path)?;
324
325        let permissions: std::fs::Permissions = PermissionsExt::from_mode(config.mode);
326        std::fs::set_permissions(&path, permissions)?;
327
328        let fut = super::print_state(PrintState::Peers, path);
329        let handle = tokio::spawn(fut);
330
331        let value = 42u32;
332
333        let (mut stream, _addr) = peers_listener.accept().await?;
334        write_json(&mut stream, &value).await?;
335
336        let result = handle.await.unwrap();
337
338        assert_eq!(
339            format!("{:?}", result.unwrap()),
340            format!("{:?}", ExitCode::FAILURE)
341        );
342
343        Ok(())
344    }
345}