1#![forbid(unsafe_code)]
9
10use std::{path::PathBuf, process::ExitCode};
11
12use clap::{Parser, Subcommand};
13use ntp_daemon::{Config, ConfigUpdate, ObservableState};
14use ntp_metrics_exporter::Metrics;
15
16#[derive(Parser)]
17#[command(version = "0.2.0", about = "Query and configure the ntpd-rs daemon")]
18#[command(arg_required_else_help(true))]
19struct Cli {
20 #[command(subcommand)]
21 command: Command,
22
23 #[arg(short, long)]
25 config: Option<PathBuf>,
26
27 #[arg(short, long)]
29 observation_socket: Option<PathBuf>,
30
31 #[arg(short = 's', long)]
33 configuration_socket: Option<PathBuf>,
34}
35
36#[derive(Subcommand)]
37enum Command {
38 #[command(about = "Information about the peers the daemon is currently connected with")]
39 Peers,
40 #[command(about = "Information about the state of the daemon itself")]
41 System,
42 #[command(
43 about = "Information about the state of the daemon and peers in the prometheus export format"
44 )]
45 Prometheus,
46 #[command(about = "Adjust configuration (e.g. loglevel) of the daemon")]
47 Config(ConfigUpdate),
48 #[command(about = "Validate configuration")]
49 Validate,
50}
51
52enum PrintState {
53 Peers,
54 System,
55 Prometheus,
56}
57
58async fn validate(cli: Cli) -> std::io::Result<ExitCode> {
59 let _ = ntp_daemon::tracing::init(
61 tracing_subscriber::EnvFilter::new("info"),
62 Default::default(),
63 );
64 match Config::from_args(cli.config, vec![], vec![]).await {
65 Ok(config) => {
66 if config.check() {
67 eprintln!("Config looks good");
68 Ok(ExitCode::SUCCESS)
69 } else {
70 Ok(ExitCode::FAILURE)
71 }
72 }
73 Err(e) => {
74 eprintln!("Error: Could not load configuration: {e}");
75 Ok(ExitCode::FAILURE)
76 }
77 }
78}
79
80pub async fn main() -> std::io::Result<ExitCode> {
81 let cli = Cli::parse();
82
83 if matches!(cli.command, Command::Validate) {
84 return validate(cli).await;
85 }
86
87 let config = Config::from_args(cli.config, vec![], vec![]).await;
88
89 if let Err(ref e) = config {
90 println!("Warning: Unable to load configuration file: {e}");
91 }
92
93 let config = config.unwrap_or_default();
94
95 let observation = cli
96 .observation_socket
97 .or(config.observe.path)
98 .unwrap_or_else(|| PathBuf::from("/run/ntpd-rs/observe"));
99
100 let configuration = cli
101 .configuration_socket
102 .or(config.configure.path)
103 .unwrap_or_else(|| PathBuf::from("/run/ntpd-rs/configure"));
104
105 match cli.command {
106 Command::Peers => print_state(PrintState::Peers, observation).await,
107 Command::System => print_state(PrintState::System, observation).await,
108 Command::Prometheus => print_state(PrintState::Prometheus, observation).await,
109 Command::Config(config_update) => update_config(configuration, config_update).await,
110 Command::Validate => unreachable!(),
111 }
112}
113
114async fn print_state(
115 print: PrintState,
116 observe_socket: PathBuf,
117) -> Result<ExitCode, std::io::Error> {
118 let mut stream = match tokio::net::UnixStream::connect(&observe_socket).await {
119 Ok(stream) => stream,
120 Err(e) => {
121 eprintln!("Could not open socket at {}: {e}", observe_socket.display(),);
122 return Ok(ExitCode::FAILURE);
123 }
124 };
125
126 let mut msg = Vec::with_capacity(16 * 1024);
127 let output =
128 match ntp_daemon::sockets::read_json::<ObservableState>(&mut stream, &mut msg).await {
129 Ok(output) => output,
130 Err(e) => {
131 eprintln!("Failed to read state from observation socket: {e}");
132
133 return Ok(ExitCode::FAILURE);
134 }
135 };
136
137 match print {
138 PrintState::Peers => {
139 println!("{}", serde_json::to_string_pretty(&output.peers).unwrap());
141 }
142 PrintState::System => {
143 println!("{}", serde_json::to_string_pretty(&output.system).unwrap());
145 }
146 PrintState::Prometheus => {
147 let metrics = Metrics::default();
148 metrics.fill(&output);
149 let registry = metrics.registry();
150 let mut buf = String::new();
151
152 if let Err(e) = prometheus_client::encoding::text::encode(&mut buf, ®istry) {
153 eprintln!("Failed to encode prometheus data: {e}");
154
155 return Ok(ExitCode::FAILURE);
156 }
157
158 println!("{buf}");
159 }
160 }
161
162 Ok(ExitCode::SUCCESS)
163}
164
165async fn update_config(
166 configuration_socket: PathBuf,
167 config_update: ConfigUpdate,
168) -> Result<ExitCode, std::io::Error> {
169 let mut stream = match tokio::net::UnixStream::connect(&configuration_socket).await {
170 Ok(stream) => stream,
171 Err(e) => {
172 eprintln!(
173 "Could not open socket at {}: {e}",
174 configuration_socket.display(),
175 );
176 return Ok(ExitCode::FAILURE);
177 }
178 };
179
180 match ntp_daemon::sockets::write_json(&mut stream, &config_update).await {
181 Ok(_) => Ok(ExitCode::SUCCESS),
182 Err(e) => {
183 eprintln!("Failed to update configuration: {e}");
184
185 Ok(ExitCode::FAILURE)
186 }
187 }
188}
189
190#[cfg(test)]
191mod tests {
192 use std::os::unix::prelude::PermissionsExt;
193
194 use ntp_daemon::{
195 config::ObserveConfig,
196 sockets::{create_unix_socket, read_json, write_json},
197 };
198
199 use super::*;
200
201 async fn write_socket_helper(
202 command: PrintState,
203 socket_name: &str,
204 ) -> std::io::Result<Result<ExitCode, std::io::Error>> {
205 let config: ObserveConfig = Default::default();
206
207 let path = std::env::temp_dir().join(socket_name);
209 if path.exists() {
210 std::fs::remove_file(&path).unwrap();
211 }
212
213 let peers_listener = create_unix_socket(&path)?;
214
215 let permissions: std::fs::Permissions = PermissionsExt::from_mode(config.mode);
216 std::fs::set_permissions(&path, permissions)?;
217
218 let fut = super::print_state(command, path);
219 let handle = tokio::spawn(fut);
220
221 let value = ObservableState {
222 system: Default::default(),
223 peers: vec![],
224 servers: vec![],
225 };
226
227 let (mut stream, _addr) = peers_listener.accept().await?;
228 write_json(&mut stream, &value).await?;
229
230 let result = handle.await.unwrap();
231
232 Ok(result)
233 }
234
235 #[tokio::test]
236 async fn test_control_socket_peer() -> std::io::Result<()> {
237 let result = write_socket_helper(PrintState::Peers, "ntp-test-stream-6").await?;
239
240 assert_eq!(
241 format!("{:?}", result.unwrap()),
242 format!("{:?}", ExitCode::SUCCESS)
243 );
244
245 Ok(())
246 }
247
248 #[tokio::test]
249 async fn test_control_socket_system() -> std::io::Result<()> {
250 let result = write_socket_helper(PrintState::System, "ntp-test-stream-7").await?;
252
253 assert_eq!(
254 format!("{:?}", result.unwrap()),
255 format!("{:?}", ExitCode::SUCCESS)
256 );
257
258 Ok(())
259 }
260
261 #[tokio::test]
262 async fn test_control_socket_prometheus() -> std::io::Result<()> {
263 let result = write_socket_helper(PrintState::Prometheus, "ntp-test-stream-8").await?;
265
266 assert_eq!(
267 format!("{:?}", result.unwrap()),
268 format!("{:?}", ExitCode::SUCCESS)
269 );
270
271 Ok(())
272 }
273
274 #[tokio::test]
275 async fn test_control_socket_config() -> std::io::Result<()> {
276 let config: ObserveConfig = Default::default();
277
278 let path = std::env::temp_dir().join("ntp-test-stream-9");
280 if path.exists() {
281 std::fs::remove_file(&path).unwrap();
282 }
283
284 let peers_listener = create_unix_socket(&path)?;
285
286 let permissions: std::fs::Permissions = PermissionsExt::from_mode(config.mode);
287 std::fs::set_permissions(&path, permissions)?;
288
289 let update = ConfigUpdate {
290 log_filter: Some("foo".to_string()),
291 panic_threshold: Some(0.123),
292 };
293
294 let fut = super::update_config(path, update.clone());
295 let handle = tokio::spawn(fut);
296
297 let (mut stream, _addr) = peers_listener.accept().await?;
298 let mut msg = Vec::with_capacity(16 * 1024);
299 let actual_update = read_json::<ConfigUpdate>(&mut stream, &mut msg).await?;
300
301 let result = handle.await.unwrap();
302
303 assert_eq!(
304 format!("{:?}", result.unwrap()),
305 format!("{:?}", ExitCode::SUCCESS)
306 );
307
308 assert_eq!(update, actual_update);
309
310 Ok(())
311 }
312
313 #[tokio::test]
314 async fn test_control_socket_peer_invalid_input() -> std::io::Result<()> {
315 let config: ObserveConfig = Default::default();
316
317 let path = std::env::temp_dir().join("ntp-test-stream-10");
319 if path.exists() {
320 std::fs::remove_file(&path).unwrap();
321 }
322
323 let peers_listener = create_unix_socket(&path)?;
324
325 let permissions: std::fs::Permissions = PermissionsExt::from_mode(config.mode);
326 std::fs::set_permissions(&path, permissions)?;
327
328 let fut = super::print_state(PrintState::Peers, path);
329 let handle = tokio::spawn(fut);
330
331 let value = 42u32;
332
333 let (mut stream, _addr) = peers_listener.accept().await?;
334 write_json(&mut stream, &value).await?;
335
336 let result = handle.await.unwrap();
337
338 assert_eq!(
339 format!("{:?}", result.unwrap()),
340 format!("{:?}", ExitCode::FAILURE)
341 );
342
343 Ok(())
344 }
345}