elasticsearch_exporter/
lib.rs

1#![feature(hash_extract_if)]
2
3//! # Proper Elasticsearch exporter
4#![deny(
5    warnings,
6    bad_style,
7    dead_code,
8    improper_ctypes,
9    non_shorthand_field_patterns,
10    no_mangle_generic_items,
11    overflowing_literals,
12    path_statements,
13    patterns_in_fns_without_body,
14    private_bounds,
15    private_interfaces,
16    unconditional_recursion,
17    unused,
18    unused_allocation,
19    unused_comparisons,
20    unused_parens,
21    while_true,
22    missing_debug_implementations,
23    missing_docs,
24    trivial_casts,
25    unused_extern_crates,
26    unused_qualifications,
27    trivial_numeric_casts,
28    unreachable_pub,
29    unused_import_braces,
30    unused_results,
31    deprecated,
32    unknown_lints,
33    unreachable_code,
34    unused_mut
35)]
36
37#[macro_use]
38extern crate log;
39#[macro_use]
40extern crate serde_derive;
41use elasticsearch::cert::{Certificate, CertificateValidation};
42use elasticsearch::http::transport::{SingleNodeConnectionPool, TransportBuilder};
43use elasticsearch::Elasticsearch;
44use prometheus::{default_registry, HistogramOpts, HistogramVec, IntGaugeVec, Opts};
45use std::collections::{BTreeMap, HashMap};
46use std::fs::File;
47use std::io::Read;
48use std::sync::Arc;
49use std::time::Duration;
50
51/// Generic collector of Elasticsearch metrics
52pub mod collection;
53/// Metric
54pub mod metric;
55
56mod options;
57pub use options::{CertificateValidationOptions, ExporterOptions};
58
59/// Reserved labels
60pub mod reserved;
61
62/// Cluster metadata
63pub mod metadata;
64
65pub(crate) mod metrics;
66
67/// Labels type with ordered keys
68pub type Labels = BTreeMap<String, String>;
69
70/// Collection labels
71pub type CollectionLabels = BTreeMap<String, Vec<String>>;
72
73/// Exporter polling intervals
74pub type ExporterPollIntervals = HashMap<String, Duration>;
75
76/// Exporter metrics switch ON/OFF
77pub type ExporterMetricsSwitch = BTreeMap<String, bool>;
78
79/// Elasticsearch exporter
80#[derive(Debug, Clone)]
81pub struct Exporter(Arc<Inner>);
82
83#[derive(Debug)]
84struct Inner {
85    /// Name of Elasticsearch cluster exporter is working
86    cluster_name: String,
87    /// Elasticsearch client instance
88    client: Elasticsearch,
89    /// Exporter options
90    options: ExporterOptions,
91    /// Constant exporter labels, e.g.: cluster
92    const_labels: HashMap<String, String>,
93
94    /// Node ID to node name map for adding extra metadata labels
95    /// {"U-WnGaTpRxucgde3miiDWw": "m1-supernode.example.com"}
96    nodes_metadata: metadata::IdToMetadata,
97
98    // Exporter metrics
99    metrics: ExporterMetrics,
100}
101
102/// Global metrics for Elasticsearch exporter
103#[derive(Debug)]
104pub struct ExporterMetrics {
105    /// Subsystem request histogram
106    subsystem_request_histogram: HistogramVec,
107    /// Cluster health status
108    cluster_health_status: IntGaugeVec,
109}
110
111impl Exporter {
112    /// Elasticsearch client instance
113    pub fn client(&self) -> &Elasticsearch {
114        &self.0.client
115    }
116
117    /// Elasticsearch cluster name
118    pub fn cluster_name(&self) -> &str {
119        &self.0.cluster_name
120    }
121
122    /// Exporter options
123    pub fn options(&self) -> &ExporterOptions {
124        &self.0.options
125    }
126
127    /// Exporter options
128    pub fn const_labels(&self) -> HashMap<String, String> {
129        self.0.const_labels.clone()
130    }
131
132    /// Node ID to node name map for adding extra metadata labels
133    /// {"U-WnGaTpRxucgde3miiDWw": "m1-supernode.example.com"}
134    pub fn nodes_metadata(&self) -> &metadata::IdToMetadata {
135        &self.0.nodes_metadata
136    }
137
138    /// Exporter metrics
139    pub fn metrics(&self) -> &ExporterMetrics {
140        &self.0.metrics
141    }
142
143    /// Spawn exporter
144    pub async fn new(options: ExporterOptions) -> Result<Self, Box<dyn std::error::Error>> {
145        let connection_pool = SingleNodeConnectionPool::new(options.elasticsearch_url.clone());
146
147        let mut transport =
148            TransportBuilder::new(connection_pool).timeout(options.elasticsearch_global_timeout);
149
150        let load_cert = || -> Result<Certificate, elasticsearch::Error> {
151            if let Some(ref cert_path) = options.elasticsearch_certificate_path {
152                let mut buf = Vec::new();
153                let _ = File::open(cert_path)?.read_to_end(&mut buf)?;
154                Certificate::from_pem(&buf)
155            } else {
156                panic!("Please provide --elasticsearch_certificate_path=CERTIFICATE_PATH flag");
157            }
158        };
159
160        match options.elasticsearch_certificate_validation {
161            Some(CertificateValidationOptions::Full) => {
162                let cert = load_cert()?;
163                transport = transport.cert_validation(CertificateValidation::Full(cert));
164            }
165            Some(CertificateValidationOptions::Partial) => {
166                let cert = load_cert()?;
167                transport = transport.cert_validation(CertificateValidation::Certificate(cert));
168            }
169            Some(CertificateValidationOptions::None) => {
170                transport = transport.cert_validation(CertificateValidation::None);
171            }
172            None => {}
173        }
174
175        let client = Elasticsearch::new(transport.build()?);
176        info!("Elasticsearch: ping");
177        let _ = client.ping().send().await?;
178
179        let nodes_metadata = if options.enable_metadata_refresh() {
180            metadata::node_data::build(&client).await?
181        } else {
182            info!("Skip metadata refresh");
183            // This will generate empty map
184            Default::default()
185        };
186
187        let cluster_name = metadata::cluster_name(&client).await?;
188
189        let mut const_labels = HashMap::new();
190        let _ = const_labels.insert("cluster".into(), cluster_name.clone());
191
192        let metrics = ExporterMetrics {
193            subsystem_request_histogram: HistogramVec::new(
194                HistogramOpts::new(
195                    "subsystem_request_duration_seconds",
196                    "The Elasticsearch subsystem request latencies in seconds.",
197                )
198                .namespace(options.exporter_metrics_namespace.as_str()),
199                &["subsystem", "cluster"],
200            )
201            .expect("valid histogram vec metric"),
202
203            cluster_health_status: IntGaugeVec::new(
204                Opts::new(
205                    "cluster_health_status",
206                    "Whether all primary and replica shards are allocated.",
207                )
208                .namespace(options.exporter_metrics_namespace.as_str()),
209                &["cluster", "color"],
210            )
211            .expect("valid prometheus metric"),
212        };
213
214        default_registry().register(Box::new(metrics.cluster_health_status.clone()))?;
215        default_registry().register(Box::new(metrics.subsystem_request_histogram.clone()))?;
216
217        Ok(Self(Arc::new(Inner {
218            cluster_name,
219            client,
220            options,
221            const_labels,
222            nodes_metadata,
223            metrics,
224        })))
225    }
226
227    /// Spawn collectors
228    pub async fn spawn(self) {
229        self.spawn_cat();
230        self.spawn_cluster();
231        self.spawn_nodes();
232        self.spawn_stats();
233
234        if self.options().enable_metadata_refresh() {
235            #[allow(clippy::let_underscore_future)]
236            let _ = tokio::spawn(metadata::node_data::poll(self));
237        }
238    }
239
240    fn spawn_cluster(&self) {
241        use metrics::_cluster::*;
242
243        is_metric_enabled!(self.clone(), health);
244    }
245
246    fn spawn_stats(&self) {
247        use metrics::_stats::*;
248
249        is_metric_enabled!(self.clone(), _all);
250    }
251
252    fn spawn_nodes(&self) {
253        use metrics::_nodes::*;
254
255        is_metric_enabled!(self.clone(), usage);
256        is_metric_enabled!(self.clone(), stats);
257        is_metric_enabled!(self.clone(), info);
258    }
259
260    // =^.^=
261    // /_cat/allocation
262    // /_cat/shards
263    // /_cat/indices
264    // /_cat/segments
265    // /_cat/nodes
266    // /_cat/recovery
267    // /_cat/health
268    // /_cat/pending_tasks
269    // /_cat/aliases
270    // /_cat/thread_pool
271    // /_cat/plugins
272    // /_cat/fielddata
273    // /_cat/nodeattrs
274    // /_cat/repositories
275    // /_cat/templates
276    // /_cat/transforms
277    fn spawn_cat(&self) {
278        use metrics::_cat::*;
279
280        is_metric_enabled!(self.clone(), allocation);
281        is_metric_enabled!(self.clone(), shards);
282        is_metric_enabled!(self.clone(), indices);
283        is_metric_enabled!(self.clone(), segments);
284        is_metric_enabled!(self.clone(), nodes);
285        is_metric_enabled!(self.clone(), recovery);
286        is_metric_enabled!(self.clone(), health);
287        is_metric_enabled!(self.clone(), pending_tasks);
288        is_metric_enabled!(self.clone(), aliases);
289        is_metric_enabled!(self.clone(), thread_pool);
290        is_metric_enabled!(self.clone(), plugins);
291        is_metric_enabled!(self.clone(), fielddata);
292        is_metric_enabled!(self.clone(), nodeattrs);
293        is_metric_enabled!(self.clone(), repositories);
294        is_metric_enabled!(self.clone(), templates);
295        is_metric_enabled!(self.clone(), transforms);
296    }
297
298    pub(crate) fn random_delay() -> u64 {
299        oorandom::Rand64::new(292).rand_range(150..800)
300    }
301}
302
303/// Convenience macro to poll metrics
304#[macro_export]
305macro_rules! is_metric_enabled {
306    ($exporter:expr, $metric:ident) => {
307        if $exporter.options().is_metric_enabled($metric::SUBSYSTEM) {
308            #[allow(clippy::let_underscore_future)]
309            let _ = tokio::spawn($metric::poll($exporter.clone()));
310        }
311    };
312}