statime_linux/metrics/
exporter.rs

1use std::path::{Path, PathBuf};
2
3use clap::Parser;
4use serde::{Deserialize, Serialize};
5use tokio::{
6    io::{AsyncReadExt, AsyncWriteExt},
7    net::{TcpListener, UnixStream},
8};
9
10use super::format::format_response;
11use crate::{initialize_logging_parse_config, observer::ObservableInstanceState};
12
13#[derive(Debug, Serialize, Deserialize)]
14pub struct ObservableState {
15    pub program: ProgramData,
16    pub instance: ObservableInstanceState,
17}
18
19#[derive(Debug, Serialize, Deserialize)]
20pub struct ProgramData {
21    pub version: String,
22    pub build_commit: String,
23    pub build_commit_date: String,
24    pub uptime_seconds: f64,
25}
26
27impl ProgramData {
28    pub fn with_uptime(uptime_seconds: f64) -> ProgramData {
29        ProgramData {
30            uptime_seconds,
31            ..Default::default()
32        }
33    }
34}
35
36impl Default for ProgramData {
37    fn default() -> Self {
38        Self {
39            version: env!("CARGO_PKG_VERSION").to_owned(),
40            build_commit: env!("STATIME_GIT_REV").to_owned(),
41            build_commit_date: env!("STATIME_GIT_DATE").to_owned(),
42            uptime_seconds: 0.0,
43        }
44    }
45}
46
47#[derive(Parser, Debug)]
48#[clap(author, version, about, long_about = None)]
49pub(crate) struct Args {
50    /// Configuration file to use
51    #[clap(
52        long = "config",
53        short = 'c',
54        default_value = "/etc/statime/statime.toml"
55    )]
56    config: PathBuf,
57}
58
59pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
60    let options = Args::parse();
61
62    let config = initialize_logging_parse_config(&options.config);
63
64    let observation_socket_path = match config.observability.observation_path {
65        Some(path) => path,
66        None => {
67            eprintln!(
68                "An observation socket path must be configured using the observation-path option \
69                 in the [observability] section of the configuration"
70            );
71            std::process::exit(1);
72        }
73    };
74
75    println!(
76        "starting statime-metrics-exporter on {}",
77        &config.observability.metrics_exporter_listen
78    );
79
80    let listener = TcpListener::bind(&config.observability.metrics_exporter_listen).await?;
81    let mut buf = String::with_capacity(4 * 1024);
82
83    loop {
84        let (mut tcp_stream, _) = listener.accept().await?;
85
86        // Wait until a request was sent, dropping the bytes read when this scope ends
87        // to ensure we don't accidentally use them afterwards
88        {
89            // Receive all data until the header was fully received, or until max buf size
90            let mut buf = [0u8; 2048];
91            let mut bytes_read = 0;
92            loop {
93                bytes_read += tcp_stream.read(&mut buf[bytes_read..]).await?;
94
95                // The headers end with two CRLFs in a row
96                if buf[0..bytes_read].windows(4).any(|w| w == b"\r\n\r\n") {
97                    break;
98                }
99
100                // Headers should easily fit within the buffer
101                // If we have not found the end yet, we are not going to
102                if bytes_read >= buf.len() {
103                    tracing::warn!("Metrics connection request too long");
104                    continue;
105                }
106            }
107
108            // We only respond to GET requests
109            if !buf[0..bytes_read].starts_with(b"GET ") {
110                tracing::warn!("Metrics connection wasn't get");
111                continue;
112            }
113        }
114
115        buf.clear();
116        match handler(&mut buf, &observation_socket_path).await {
117            Ok(()) => {
118                tcp_stream.write_all(buf.as_bytes()).await?;
119            }
120            Err(e) => {
121                log::warn!("error: {e}");
122                const ERROR_REPONSE: &str = concat!(
123                    "HTTP/1.1 500 Internal Server Error\r\n",
124                    "content-type: text/plain\r\n",
125                    "content-length: 0\r\n\r\n",
126                );
127
128                tcp_stream.write_all(ERROR_REPONSE.as_bytes()).await?;
129            }
130        }
131    }
132}
133
134pub async fn read_json<'a, T>(
135    stream: &mut UnixStream,
136    buffer: &'a mut Vec<u8>,
137) -> std::io::Result<T>
138where
139    T: serde::Deserialize<'a>,
140{
141    buffer.clear();
142
143    let n = stream.read_buf(buffer).await?;
144    buffer.truncate(n);
145    serde_json::from_slice(buffer)
146        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
147}
148
149async fn handler(buf: &mut String, observation_socket_path: &Path) -> std::io::Result<()> {
150    let mut stream = tokio::net::UnixStream::connect(observation_socket_path).await?;
151    let mut msg = Vec::with_capacity(16 * 1024);
152    let observable_state: ObservableState = read_json(&mut stream, &mut msg).await?;
153
154    format_response(buf, &observable_state)
155        .map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "formatting error"))
156}
157
158#[cfg(test)]
159mod tests {
160    use std::path::Path;
161
162    use clap::Parser;
163
164    use crate::metrics::exporter::Args;
165
166    const BINARY: &str = "/usr/bin/statime-metrics-exporter";
167
168    #[test]
169    fn cli_config() {
170        let config_str = "/foo/bar/statime.toml";
171        let config = Path::new(config_str);
172        let arguments = &[BINARY, "-c", config_str];
173
174        let options = Args::try_parse_from(arguments).unwrap();
175        assert_eq!(options.config.as_path(), config);
176    }
177}