1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use hyper::Uri;
use lazy_static::lazy_static;
use prometheus::*;
use structopt::StructOpt;

mod consul;
mod http;
mod patroni;

use std::net::SocketAddr;
use std::time::Duration;

lazy_static! {
    pub static ref GAUGE_PG_VERSION: GaugeVec = register_gauge_vec!("patroni_postgres_version", "PostgreSQL version", &["server", "role"]).unwrap();
    pub static ref GAUGE_PATRONI_VERSION: GaugeVec = register_gauge_vec!("patroni_version", "Patroni version", &["server", "role", "version"]).unwrap();
    pub static ref GAUGE_RUNNING: GaugeVec = register_gauge_vec!("patroni_running", "Is PostgreSQL running", &["server", "role"]).unwrap();
    pub static ref GAUGE_PENDING_RESTART: GaugeVec = register_gauge_vec!("patroni_pending_restart", "Node is pending a restart", &["server", "role"]).unwrap();
    
    pub static ref GAUGE_TIMELINE: GaugeVec = register_gauge_vec!("patroni_timeline_number", "Patroni timeline number", &["server", "role"]).unwrap();
    pub static ref GAUGE_REPL_SLOTS: GaugeVec = register_gauge_vec!("patroni_replication_slots", "Postgres replication slots connected", &["server", "role"]).unwrap();
}

/// Export Patroni metrics to Prometheus
#[derive(StructOpt)]
#[structopt(name = "patroni_exporter")]
struct Args {
    /// Consul URL
    #[structopt(short = "c", long = "consul", env = "CONSUL_HTTP_ADDR")]
    consul_url: Uri,

    /// Consul token
    #[structopt(short = "t", long = "token", env = "CONSUL_HTTP_TOKEN")]
    consul_token: Option<String>,

    /// Patroni service name
    #[structopt(short = "s", long = "service", env = "PATRONI_SERVICE")]
    service: String,

    /// HTTP listen address
    #[structopt(short = "l", long = "listen")]
    listen_addr: SocketAddr,

    /// Logging verbosity
    #[structopt(short = "v", parse(from_occurrences))]
    verbose: u8
}

pub async fn run() {
    let args = Args::from_args();

    // Init logging
    // Derive verbosity from args
    let log_level = match args.verbose {
        0 => tracing::Level::INFO,
        1 => tracing::Level::DEBUG,
        _ => tracing::Level::TRACE
    };

    let subscriber = tracing_subscriber::FmtSubscriber::builder()
        .with_max_level(log_level)
        .finish();

    tracing::subscriber::set_global_default(subscriber)
        .expect("setting tracing default failed");

    // Start HTTP server
    tokio::spawn(http::listen(args.listen_addr));
    
    let consul = consul::ConsulClient::new(&args.consul_url, &args.consul_token);
    
    // Keep track of Consul failures and bail after a few in a row
    let mut consul_fails = 0usize;

    // Also keep track of the nodes we're monitoring as these could change
    // HashMap<$node, $missing>

    tracing::info!("Starting monitoring");
    loop {
        tracing::debug!("Querying Consul");
        match consul.service(&args.service).await {
            Ok(patroni) => {
                for (server, state) in &patroni {
                    let is_running = match state.is_running() {
                        true => 1.0,
                        false => 0.0
                    };
                    GAUGE_RUNNING.with_label_values(&[server, &state.role()]).set(is_running);

                    let pending_restart = match state.pending_restart() {
                        true => 1.0,
                        false => 0.0
                    };
                    GAUGE_PENDING_RESTART.with_label_values(&[server, &state.role()]).set(pending_restart);
                    
                    GAUGE_PG_VERSION.with_label_values(&[server, &state.role()]).set(state.postgres_version() as f64);
                    GAUGE_PATRONI_VERSION.with_label_values(&[server, &state.role(), state.patroni_version()]).set(1.0);
                    
                    GAUGE_TIMELINE.with_label_values(&[server, &state.role()]).set(state.timeline() as f64);
                    GAUGE_REPL_SLOTS.with_label_values(&[server, &state.role()]).set(state.repl_slots() as f64);
                }

                // Reset our error counter
                consul_fails = 0;
            },
            Err(error) => {
                tracing::error!(%error, "unable to query consul");
                consul_fails += 1;

                if consul_fails >= 5 {
                    tracing::error!("persistant error connecting to Consul, quitting");
                    break;
                }
            }
        };

        // Sleep for 10 secs
        tokio::time::delay_for(Duration::from_secs(30)).await;
    }
}