1#![feature(hash_extract_if)]
2
3#![deny(
5 warnings,
6 bad_style,
7 dead_code,
8 improper_ctypes,
9 non_shorthand_field_patterns,
10 no_mangle_generic_items,
11 overflowing_literals,
12 path_statements,
13 patterns_in_fns_without_body,
14 private_bounds,
15 private_interfaces,
16 unconditional_recursion,
17 unused,
18 unused_allocation,
19 unused_comparisons,
20 unused_parens,
21 while_true,
22 missing_debug_implementations,
23 missing_docs,
24 trivial_casts,
25 unused_extern_crates,
26 unused_qualifications,
27 trivial_numeric_casts,
28 unreachable_pub,
29 unused_import_braces,
30 unused_results,
31 deprecated,
32 unknown_lints,
33 unreachable_code,
34 unused_mut
35)]
36
37#[macro_use]
38extern crate log;
39#[macro_use]
40extern crate serde_derive;
41use elasticsearch::cert::{Certificate, CertificateValidation};
42use elasticsearch::http::transport::{SingleNodeConnectionPool, TransportBuilder};
43use elasticsearch::Elasticsearch;
44use prometheus::{default_registry, HistogramOpts, HistogramVec, IntGaugeVec, Opts};
45use std::collections::{BTreeMap, HashMap};
46use std::fs::File;
47use std::io::Read;
48use std::sync::Arc;
49use std::time::Duration;
50
51pub mod collection;
53pub mod metric;
55
56mod options;
57pub use options::{CertificateValidationOptions, ExporterOptions};
58
59pub mod reserved;
61
62pub mod metadata;
64
65pub(crate) mod metrics;
66
67pub type Labels = BTreeMap<String, String>;
69
70pub type CollectionLabels = BTreeMap<String, Vec<String>>;
72
73pub type ExporterPollIntervals = HashMap<String, Duration>;
75
76pub type ExporterMetricsSwitch = BTreeMap<String, bool>;
78
79#[derive(Debug, Clone)]
81pub struct Exporter(Arc<Inner>);
82
83#[derive(Debug)]
84struct Inner {
85 cluster_name: String,
87 client: Elasticsearch,
89 options: ExporterOptions,
91 const_labels: HashMap<String, String>,
93
94 nodes_metadata: metadata::IdToMetadata,
97
98 metrics: ExporterMetrics,
100}
101
102#[derive(Debug)]
104pub struct ExporterMetrics {
105 subsystem_request_histogram: HistogramVec,
107 cluster_health_status: IntGaugeVec,
109}
110
111impl Exporter {
112 pub fn client(&self) -> &Elasticsearch {
114 &self.0.client
115 }
116
117 pub fn cluster_name(&self) -> &str {
119 &self.0.cluster_name
120 }
121
122 pub fn options(&self) -> &ExporterOptions {
124 &self.0.options
125 }
126
127 pub fn const_labels(&self) -> HashMap<String, String> {
129 self.0.const_labels.clone()
130 }
131
132 pub fn nodes_metadata(&self) -> &metadata::IdToMetadata {
135 &self.0.nodes_metadata
136 }
137
138 pub fn metrics(&self) -> &ExporterMetrics {
140 &self.0.metrics
141 }
142
143 pub async fn new(options: ExporterOptions) -> Result<Self, Box<dyn std::error::Error>> {
145 let connection_pool = SingleNodeConnectionPool::new(options.elasticsearch_url.clone());
146
147 let mut transport =
148 TransportBuilder::new(connection_pool).timeout(options.elasticsearch_global_timeout);
149
150 let load_cert = || -> Result<Certificate, elasticsearch::Error> {
151 if let Some(ref cert_path) = options.elasticsearch_certificate_path {
152 let mut buf = Vec::new();
153 let _ = File::open(cert_path)?.read_to_end(&mut buf)?;
154 Certificate::from_pem(&buf)
155 } else {
156 panic!("Please provide --elasticsearch_certificate_path=CERTIFICATE_PATH flag");
157 }
158 };
159
160 match options.elasticsearch_certificate_validation {
161 Some(CertificateValidationOptions::Full) => {
162 let cert = load_cert()?;
163 transport = transport.cert_validation(CertificateValidation::Full(cert));
164 }
165 Some(CertificateValidationOptions::Partial) => {
166 let cert = load_cert()?;
167 transport = transport.cert_validation(CertificateValidation::Certificate(cert));
168 }
169 Some(CertificateValidationOptions::None) => {
170 transport = transport.cert_validation(CertificateValidation::None);
171 }
172 None => {}
173 }
174
175 let client = Elasticsearch::new(transport.build()?);
176 info!("Elasticsearch: ping");
177 let _ = client.ping().send().await?;
178
179 let nodes_metadata = if options.enable_metadata_refresh() {
180 metadata::node_data::build(&client).await?
181 } else {
182 info!("Skip metadata refresh");
183 Default::default()
185 };
186
187 let cluster_name = metadata::cluster_name(&client).await?;
188
189 let mut const_labels = HashMap::new();
190 let _ = const_labels.insert("cluster".into(), cluster_name.clone());
191
192 let metrics = ExporterMetrics {
193 subsystem_request_histogram: HistogramVec::new(
194 HistogramOpts::new(
195 "subsystem_request_duration_seconds",
196 "The Elasticsearch subsystem request latencies in seconds.",
197 )
198 .namespace(options.exporter_metrics_namespace.as_str()),
199 &["subsystem", "cluster"],
200 )
201 .expect("valid histogram vec metric"),
202
203 cluster_health_status: IntGaugeVec::new(
204 Opts::new(
205 "cluster_health_status",
206 "Whether all primary and replica shards are allocated.",
207 )
208 .namespace(options.exporter_metrics_namespace.as_str()),
209 &["cluster", "color"],
210 )
211 .expect("valid prometheus metric"),
212 };
213
214 default_registry().register(Box::new(metrics.cluster_health_status.clone()))?;
215 default_registry().register(Box::new(metrics.subsystem_request_histogram.clone()))?;
216
217 Ok(Self(Arc::new(Inner {
218 cluster_name,
219 client,
220 options,
221 const_labels,
222 nodes_metadata,
223 metrics,
224 })))
225 }
226
227 pub async fn spawn(self) {
229 self.spawn_cat();
230 self.spawn_cluster();
231 self.spawn_nodes();
232 self.spawn_stats();
233
234 if self.options().enable_metadata_refresh() {
235 #[allow(clippy::let_underscore_future)]
236 let _ = tokio::spawn(metadata::node_data::poll(self));
237 }
238 }
239
240 fn spawn_cluster(&self) {
241 use metrics::_cluster::*;
242
243 is_metric_enabled!(self.clone(), health);
244 }
245
246 fn spawn_stats(&self) {
247 use metrics::_stats::*;
248
249 is_metric_enabled!(self.clone(), _all);
250 }
251
252 fn spawn_nodes(&self) {
253 use metrics::_nodes::*;
254
255 is_metric_enabled!(self.clone(), usage);
256 is_metric_enabled!(self.clone(), stats);
257 is_metric_enabled!(self.clone(), info);
258 }
259
260 fn spawn_cat(&self) {
278 use metrics::_cat::*;
279
280 is_metric_enabled!(self.clone(), allocation);
281 is_metric_enabled!(self.clone(), shards);
282 is_metric_enabled!(self.clone(), indices);
283 is_metric_enabled!(self.clone(), segments);
284 is_metric_enabled!(self.clone(), nodes);
285 is_metric_enabled!(self.clone(), recovery);
286 is_metric_enabled!(self.clone(), health);
287 is_metric_enabled!(self.clone(), pending_tasks);
288 is_metric_enabled!(self.clone(), aliases);
289 is_metric_enabled!(self.clone(), thread_pool);
290 is_metric_enabled!(self.clone(), plugins);
291 is_metric_enabled!(self.clone(), fielddata);
292 is_metric_enabled!(self.clone(), nodeattrs);
293 is_metric_enabled!(self.clone(), repositories);
294 is_metric_enabled!(self.clone(), templates);
295 is_metric_enabled!(self.clone(), transforms);
296 }
297
298 pub(crate) fn random_delay() -> u64 {
299 oorandom::Rand64::new(292).rand_range(150..800)
300 }
301}
302
303#[macro_export]
305macro_rules! is_metric_enabled {
306 ($exporter:expr, $metric:ident) => {
307 if $exporter.options().is_metric_enabled($metric::SUBSYSTEM) {
308 #[allow(clippy::let_underscore_future)]
309 let _ = tokio::spawn($metric::poll($exporter.clone()));
310 }
311 };
312}