ntp_metrics_exporter/
lib.rs1#![forbid(unsafe_code)]
9
10mod metrics;
11
12pub use metrics::Metrics;
13use tokio::io::AsyncWriteExt;
14use tokio::net::TcpListener;
15
16use std::{fmt::Write, net::SocketAddr, path::PathBuf};
17
18use clap::Parser;
19use ntp_daemon::{Config, ObservableState};
20
21#[derive(Parser)]
22#[command(version = "0.2.0", about = "Serve ntpd-rs openmetrics via http")]
23struct Cli {
24 #[arg(short, long)]
26 config: Option<PathBuf>,
27
28 #[arg(short, long)]
30 observation_socket: Option<PathBuf>,
31
32 #[arg(short = 'l', long = "listen", default_value = "127.0.0.1:9975")]
33 listen_socket: SocketAddr,
34}
35
36pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
37 let cli = Cli::parse();
38
39 let config = Config::from_args(cli.config, vec![], vec![]).await;
40
41 if let Err(ref e) = config {
42 println!("Warning: Unable to load configuration file: {e}");
43 }
44
45 let config = config.unwrap_or_default();
46
47 let observation_socket_path = match cli.observation_socket {
48 Some(path) => path,
49 None => match config.observe.path {
50 Some(path) => path,
51 None => "/run/ntpd-rs/observe".into(),
52 },
53 };
54
55 println!("starting ntp-metrics-exporter on {}", &cli.listen_socket);
56
57 let listener = TcpListener::bind(cli.listen_socket).await?;
58
59 loop {
60 let (mut tcp_stream, _) = listener.accept().await?;
61
62 let mut stream = tokio::net::UnixStream::connect(&observation_socket_path).await?;
63 let mut msg = Vec::with_capacity(16 * 1024);
64 let output: ObservableState = ntp_daemon::sockets::read_json(&mut stream, &mut msg).await?;
65 let metrics = Metrics::default();
66 metrics.fill(&output);
67 let registry = metrics.registry();
68
69 let mut content = String::with_capacity(4 * 1024);
70 prometheus_client::encoding::text::encode(&mut content, ®istry)?;
71
72 let mut buf = String::with_capacity(4 * 1024);
73
74 buf.push_str("HTTP/1.1 200 OK\r\n");
76 buf.push_str("content-type: text/plain\r\n");
77 write!(buf, "content-length: {}\r\n\r\n", content.len()).unwrap();
78
79 buf.push_str(&content);
81
82 tcp_stream.write_all(buf.as_bytes()).await.unwrap();
83 }
84}