Skip to main content

pgroles_operator/
observability.rs

1//! Operator health endpoints and OTLP metrics export.
2
3use std::net::SocketAddr;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::time::Instant;
7
8use axum::extract::State;
9use axum::http::StatusCode;
10use axum::response::IntoResponse;
11use axum::routing::get;
12use axum::{Router, serve};
13use opentelemetry::KeyValue;
14use opentelemetry::metrics::{Counter, Histogram, Meter, MeterProvider, UpDownCounter};
15use opentelemetry_otlp::{MetricExporter, Protocol, WithExportConfig};
16use opentelemetry_sdk::Resource;
17use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
18use tokio::net::TcpListener;
19
20const SERVICE_NAME: &str = "pgroles-operator";
21
22#[derive(Clone)]
23pub struct OperatorObservability {
24    ready: Arc<AtomicBool>,
25    metrics: Option<Arc<Metrics>>,
26}
27
28struct Metrics {
29    provider: SdkMeterProvider,
30    reconcile_total: Counter<u64>,
31    reconcile_duration_ms: Histogram<u64>,
32    reconcile_inflight: UpDownCounter<i64>,
33    plan_total: Counter<u64>,
34    plan_changes_total: Counter<u64>,
35    lock_contention_total: Counter<u64>,
36    policy_conflicts_total: Counter<u64>,
37    invalid_spec_total: Counter<u64>,
38    database_connection_failures_total: Counter<u64>,
39    apply_total: Counter<u64>,
40    apply_statements_total: Counter<u64>,
41}
42
43pub struct ReconcileGuard {
44    metrics: Option<Arc<Metrics>>,
45    started_at: Instant,
46}
47
48impl OperatorObservability {
49    pub fn from_env() -> anyhow::Result<Self> {
50        Ok(Self {
51            ready: Arc::new(AtomicBool::new(false)),
52            metrics: init_metrics_from_env()?,
53        })
54    }
55
56    pub fn mark_ready(&self) {
57        self.ready.store(true, Ordering::Relaxed);
58    }
59
60    pub fn mark_not_ready(&self) {
61        self.ready.store(false, Ordering::Relaxed);
62    }
63
64    pub fn start_reconcile(&self) -> ReconcileGuard {
65        if let Some(metrics) = &self.metrics {
66            metrics.reconcile_inflight.add(1, &[]);
67            ReconcileGuard {
68                metrics: Some(metrics.clone()),
69                started_at: Instant::now(),
70            }
71        } else {
72            ReconcileGuard {
73                metrics: None,
74                started_at: Instant::now(),
75            }
76        }
77    }
78
79    pub fn record_database_connection_failure(&self) {
80        if let Some(metrics) = &self.metrics {
81            metrics.database_connection_failures_total.add(1, &[]);
82        }
83    }
84
85    pub fn record_policy_conflict(&self) {
86        if let Some(metrics) = &self.metrics {
87            metrics.policy_conflicts_total.add(1, &[]);
88        }
89    }
90
91    pub fn record_lock_contention(&self) {
92        if let Some(metrics) = &self.metrics {
93            metrics.lock_contention_total.add(1, &[]);
94        }
95    }
96
97    pub fn record_plan_result(&self, result: &str) {
98        if let Some(metrics) = &self.metrics {
99            metrics
100                .plan_total
101                .add(1, &[KeyValue::new("result", result.to_string())]);
102        }
103    }
104
105    pub fn record_planned_changes(&self, changes: usize) {
106        if changes == 0 {
107            return;
108        }
109        if let Some(metrics) = &self.metrics {
110            metrics.plan_changes_total.add(changes as u64, &[]);
111        }
112    }
113
114    pub fn record_invalid_spec(&self) {
115        if let Some(metrics) = &self.metrics {
116            metrics.invalid_spec_total.add(1, &[]);
117        }
118    }
119
120    pub fn record_apply_result(&self, result: &str) {
121        if let Some(metrics) = &self.metrics {
122            metrics
123                .apply_total
124                .add(1, &[KeyValue::new("result", result.to_string())]);
125        }
126    }
127
128    pub fn record_apply_statements(&self, statements: usize) {
129        if statements == 0 {
130            return;
131        }
132        if let Some(metrics) = &self.metrics {
133            metrics.apply_statements_total.add(statements as u64, &[]);
134        }
135    }
136
137    pub fn shutdown(&self) -> anyhow::Result<()> {
138        if let Some(metrics) = &self.metrics {
139            metrics.provider.shutdown()?;
140        }
141        Ok(())
142    }
143}
144
145impl ReconcileGuard {
146    pub fn record_result(self, result: &str, reason: &str) {
147        if let Some(metrics) = &self.metrics {
148            metrics.reconcile_total.add(
149                1,
150                &[
151                    KeyValue::new("result", result.to_string()),
152                    KeyValue::new("reason", reason.to_string()),
153                ],
154            );
155            metrics
156                .reconcile_duration_ms
157                .record(self.started_at.elapsed().as_millis() as u64, &[]);
158        }
159    }
160}
161
162impl Drop for ReconcileGuard {
163    fn drop(&mut self) {
164        if let Some(metrics) = &self.metrics {
165            metrics.reconcile_inflight.add(-1, &[]);
166        }
167    }
168}
169
170pub async fn serve_health(
171    bind_addr: SocketAddr,
172    observability: OperatorObservability,
173) -> anyhow::Result<()> {
174    let listener = TcpListener::bind(bind_addr).await?;
175    let app = Router::new()
176        .route("/livez", get(livez))
177        .route("/readyz", get(readyz))
178        .with_state(observability);
179
180    serve(listener, app).await?;
181    Ok(())
182}
183
184fn init_metrics_from_env() -> anyhow::Result<Option<Arc<Metrics>>> {
185    if !otel_metrics_enabled() {
186        return Ok(None);
187    }
188
189    let exporter = MetricExporter::builder()
190        .with_tonic()
191        .with_protocol(Protocol::Grpc)
192        .build()?;
193
194    let reader = PeriodicReader::builder(exporter).build();
195    let provider = SdkMeterProvider::builder()
196        .with_reader(reader)
197        .with_resource(
198            Resource::builder_empty()
199                .with_attributes([
200                    KeyValue::new("service.name", SERVICE_NAME),
201                    KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
202                ])
203                .build(),
204        )
205        .build();
206
207    let meter = provider.meter(SERVICE_NAME);
208    Ok(Some(Arc::new(Metrics::new(provider, meter))))
209}
210
211fn otel_metrics_enabled() -> bool {
212    let metrics_exporter = std::env::var("OTEL_METRICS_EXPORTER").ok();
213    if matches!(metrics_exporter.as_deref(), Some("none")) {
214        return false;
215    }
216
217    std::env::var_os("OTEL_EXPORTER_OTLP_ENDPOINT").is_some()
218        || std::env::var_os("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT").is_some()
219}
220
221impl Metrics {
222    fn new(provider: SdkMeterProvider, meter: Meter) -> Self {
223        Self {
224            provider,
225            reconcile_total: meter
226                .u64_counter("pgroles.reconcile.total")
227                .with_description("Total reconciliations by result and reason")
228                .build(),
229            reconcile_duration_ms: meter
230                .u64_histogram("pgroles.reconcile.duration")
231                .with_unit("ms")
232                .with_description("Reconciliation duration in milliseconds")
233                .build(),
234            reconcile_inflight: meter
235                .i64_up_down_counter("pgroles.reconcile.inflight")
236                .with_description("In-flight reconciliations")
237                .build(),
238            plan_total: meter
239                .u64_counter("pgroles.plan.total")
240                .with_description("Successful plan-mode reconciliations by result")
241                .build(),
242            plan_changes_total: meter
243                .u64_counter("pgroles.plan.changes")
244                .with_description("Planned changes discovered during plan-mode reconciliations")
245                .build(),
246            lock_contention_total: meter
247                .u64_counter("pgroles.lock_contention.total")
248                .with_description("Reconciliations delayed by per-database lock contention")
249                .build(),
250            policy_conflicts_total: meter
251                .u64_counter("pgroles.policy.conflicts")
252                .with_description("Conflicting policies targeting the same database")
253                .build(),
254            invalid_spec_total: meter
255                .u64_counter("pgroles.invalid_spec.total")
256                .with_description("Invalid PostgresPolicy specifications")
257                .build(),
258            database_connection_failures_total: meter
259                .u64_counter("pgroles.database.connection_failures")
260                .with_description("Database connection failures during reconciliation")
261                .build(),
262            apply_total: meter
263                .u64_counter("pgroles.apply.total")
264                .with_description("Apply transaction outcomes")
265                .build(),
266            apply_statements_total: meter
267                .u64_counter("pgroles.apply.statements")
268                .with_description("SQL statements executed during successful applies")
269                .build(),
270        }
271    }
272}
273
274async fn livez() -> &'static str {
275    "ok"
276}
277
278async fn readyz(State(observability): State<OperatorObservability>) -> impl IntoResponse {
279    if observability.ready.load(Ordering::Relaxed) {
280        (StatusCode::OK, "ready")
281    } else {
282        (StatusCode::SERVICE_UNAVAILABLE, "not ready")
283    }
284}
285
286#[cfg(test)]
287mod tests {
288    use std::sync::{Arc, Mutex};
289
290    use axum::extract::State;
291    use axum::http::StatusCode;
292    use axum::response::IntoResponse;
293    use opentelemetry::metrics::MeterProvider;
294    use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData, ResourceMetrics};
295    use opentelemetry_sdk::metrics::{InMemoryMetricExporter, PeriodicReader, SdkMeterProvider};
296
297    use super::{
298        Metrics, OperatorObservability, ReconcileGuard, SERVICE_NAME, livez, otel_metrics_enabled,
299        readyz,
300    };
301
302    static ENV_LOCK: Mutex<()> = Mutex::new(());
303
304    fn test_observability() -> (
305        OperatorObservability,
306        SdkMeterProvider,
307        InMemoryMetricExporter,
308    ) {
309        let exporter = InMemoryMetricExporter::default();
310        let provider = SdkMeterProvider::builder()
311            .with_reader(PeriodicReader::builder(exporter.clone()).build())
312            .build();
313        let meter = provider.meter(SERVICE_NAME);
314        let observability = OperatorObservability {
315            ready: Arc::new(std::sync::atomic::AtomicBool::new(false)),
316            metrics: Some(Arc::new(Metrics::new(provider.clone(), meter))),
317        };
318
319        (observability, provider, exporter)
320    }
321
322    fn metric_exists(metrics: &[ResourceMetrics], name: &str) -> bool {
323        metrics.iter().any(|resource_metrics| {
324            resource_metrics
325                .scope_metrics()
326                .flat_map(|scope_metrics| scope_metrics.metrics())
327                .any(|metric| metric.name() == name)
328        })
329    }
330
331    fn u64_sum_value(metrics: &[ResourceMetrics], name: &str) -> Option<u64> {
332        metrics
333            .iter()
334            .flat_map(|resource_metrics| resource_metrics.scope_metrics())
335            .flat_map(|scope_metrics| scope_metrics.metrics())
336            .find(|metric| metric.name() == name)
337            .and_then(|metric| match metric.data() {
338                AggregatedMetrics::U64(MetricData::Sum(sum)) => sum
339                    .data_points()
340                    .next()
341                    .map(|data_point| data_point.value()),
342                _ => None,
343            })
344    }
345
346    fn i64_sum_value(metrics: &[ResourceMetrics], name: &str) -> Option<i64> {
347        metrics
348            .iter()
349            .flat_map(|resource_metrics| resource_metrics.scope_metrics())
350            .flat_map(|scope_metrics| scope_metrics.metrics())
351            .find(|metric| metric.name() == name)
352            .and_then(|metric| match metric.data() {
353                AggregatedMetrics::I64(MetricData::Sum(sum)) => sum
354                    .data_points()
355                    .next()
356                    .map(|data_point| data_point.value()),
357                _ => None,
358            })
359    }
360
361    #[test]
362    fn otel_metrics_stay_disabled_without_endpoint() {
363        let _guard = ENV_LOCK.lock().expect("env lock should not be poisoned");
364        unsafe {
365            std::env::remove_var("OTEL_EXPORTER_OTLP_ENDPOINT");
366            std::env::remove_var("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT");
367            std::env::remove_var("OTEL_METRICS_EXPORTER");
368        }
369        assert!(!otel_metrics_enabled());
370    }
371
372    #[test]
373    fn otel_metrics_enable_with_explicit_endpoint() {
374        let _guard = ENV_LOCK.lock().expect("env lock should not be poisoned");
375        unsafe {
376            std::env::set_var("OTEL_EXPORTER_OTLP_ENDPOINT", "http://collector:4317");
377            std::env::remove_var("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT");
378            std::env::remove_var("OTEL_METRICS_EXPORTER");
379        }
380        assert!(otel_metrics_enabled());
381        unsafe {
382            std::env::remove_var("OTEL_EXPORTER_OTLP_ENDPOINT");
383        }
384    }
385
386    #[tokio::test]
387    async fn health_endpoints_reflect_readiness() {
388        let (observability, _provider, _exporter) = test_observability();
389
390        assert_eq!(livez().await, "ok");
391
392        let not_ready = readyz(State(observability.clone())).await.into_response();
393        assert_eq!(not_ready.status(), StatusCode::SERVICE_UNAVAILABLE);
394
395        observability.mark_ready();
396        let ready = readyz(State(observability)).await.into_response();
397        assert_eq!(ready.status(), StatusCode::OK);
398    }
399
400    #[test]
401    fn metrics_are_recorded_and_flushed() {
402        let (observability, provider, exporter) = test_observability();
403
404        let guard: ReconcileGuard = observability.start_reconcile();
405        observability.record_lock_contention();
406        observability.record_policy_conflict();
407        observability.record_invalid_spec();
408        observability.record_database_connection_failure();
409        observability.record_plan_result("drift");
410        observability.record_planned_changes(2);
411        observability.record_apply_result("success");
412        observability.record_apply_statements(4);
413        guard.record_result("conflict", "ConflictingPolicy");
414
415        provider.force_flush().expect("flush should succeed");
416
417        let metrics = exporter
418            .get_finished_metrics()
419            .expect("metrics should be exported");
420
421        assert!(metric_exists(&metrics, "pgroles.reconcile.total"));
422        assert!(metric_exists(&metrics, "pgroles.reconcile.duration"));
423        assert_eq!(u64_sum_value(&metrics, "pgroles.plan.total"), Some(1));
424        assert_eq!(u64_sum_value(&metrics, "pgroles.plan.changes"), Some(2));
425        assert_eq!(
426            u64_sum_value(&metrics, "pgroles.lock_contention.total"),
427            Some(1)
428        );
429        assert_eq!(u64_sum_value(&metrics, "pgroles.policy.conflicts"), Some(1));
430        assert_eq!(
431            u64_sum_value(&metrics, "pgroles.invalid_spec.total"),
432            Some(1)
433        );
434        assert_eq!(
435            u64_sum_value(&metrics, "pgroles.database.connection_failures"),
436            Some(1)
437        );
438        assert_eq!(u64_sum_value(&metrics, "pgroles.apply.statements"), Some(4));
439        assert_eq!(
440            i64_sum_value(&metrics, "pgroles.reconcile.inflight"),
441            Some(0)
442        );
443    }
444}