1use hyper::Uri;
2use lazy_static::lazy_static;
3use prometheus::*;
4use structopt::StructOpt;
5
6mod consul;
7mod http;
8mod patroni;
9
10use std::net::SocketAddr;
11use std::time::Duration;
12
13lazy_static! {
14 pub static ref GAUGE_PG_VERSION: GaugeVec = register_gauge_vec!("patroni_postgres_version", "PostgreSQL version", &["server", "role"]).unwrap();
15 pub static ref GAUGE_PATRONI_VERSION: GaugeVec = register_gauge_vec!("patroni_version", "Patroni version", &["server", "role", "version"]).unwrap();
16 pub static ref GAUGE_RUNNING: GaugeVec = register_gauge_vec!("patroni_running", "Is PostgreSQL running", &["server", "role"]).unwrap();
17 pub static ref GAUGE_PENDING_RESTART: GaugeVec = register_gauge_vec!("patroni_pending_restart", "Node is pending a restart", &["server", "role"]).unwrap();
18
19 pub static ref GAUGE_TIMELINE: GaugeVec = register_gauge_vec!("patroni_timeline_number", "Patroni timeline number", &["server", "role"]).unwrap();
20 pub static ref GAUGE_REPL_SLOTS: GaugeVec = register_gauge_vec!("patroni_replication_slots", "Postgres replication slots connected", &["server", "role"]).unwrap();
21}
22
23#[derive(StructOpt)]
25#[structopt(name = "patroni_exporter")]
26struct Args {
27 #[structopt(short = "c", long = "consul", env = "CONSUL_HTTP_ADDR")]
29 consul_url: Uri,
30
31 #[structopt(short = "t", long = "token", env = "CONSUL_HTTP_TOKEN")]
33 consul_token: Option<String>,
34
35 #[structopt(short = "s", long = "service", env = "PATRONI_SERVICE")]
37 service: String,
38
39 #[structopt(short = "l", long = "listen")]
41 listen_addr: SocketAddr,
42
43 #[structopt(short = "v", parse(from_occurrences))]
45 verbose: u8
46}
47
48pub async fn run() {
49 let args = Args::from_args();
50
51 let log_level = match args.verbose {
54 0 => tracing::Level::INFO,
55 1 => tracing::Level::DEBUG,
56 _ => tracing::Level::TRACE
57 };
58
59 let subscriber = tracing_subscriber::FmtSubscriber::builder()
60 .with_max_level(log_level)
61 .finish();
62
63 tracing::subscriber::set_global_default(subscriber)
64 .expect("setting tracing default failed");
65
66 tokio::spawn(http::listen(args.listen_addr));
68
69 let consul = consul::ConsulClient::new(&args.consul_url, &args.consul_token);
70
71 let mut consul_fails = 0usize;
73
74 tracing::info!("Starting monitoring");
78 loop {
79 tracing::debug!("Querying Consul");
80 match consul.service(&args.service).await {
81 Ok(patroni) => {
82 for (server, state) in &patroni {
83 let is_running = match state.is_running() {
84 true => 1.0,
85 false => 0.0
86 };
87 GAUGE_RUNNING.with_label_values(&[server, &state.role()]).set(is_running);
88
89 let pending_restart = match state.pending_restart() {
90 true => 1.0,
91 false => 0.0
92 };
93 GAUGE_PENDING_RESTART.with_label_values(&[server, &state.role()]).set(pending_restart);
94
95 GAUGE_PG_VERSION.with_label_values(&[server, &state.role()]).set(state.postgres_version() as f64);
96 GAUGE_PATRONI_VERSION.with_label_values(&[server, &state.role(), state.patroni_version()]).set(1.0);
97
98 GAUGE_TIMELINE.with_label_values(&[server, &state.role()]).set(state.timeline() as f64);
99 GAUGE_REPL_SLOTS.with_label_values(&[server, &state.role()]).set(state.repl_slots() as f64);
100 }
101
102 consul_fails = 0;
104 },
105 Err(error) => {
106 tracing::error!(%error, "unable to query consul");
107 consul_fails += 1;
108
109 if consul_fails >= 5 {
110 tracing::error!("persistant error connecting to Consul, quitting");
111 break;
112 }
113 }
114 };
115
116 tokio::time::delay_for(Duration::from_secs(30)).await;
118 }
119}