Skip to main content

pgroles_operator/
observability.rs

1//! Operator health endpoints and OTLP metrics export.
2
3use std::net::SocketAddr;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::time::Instant;
7
8use axum::extract::State;
9use axum::http::StatusCode;
10use axum::response::IntoResponse;
11use axum::routing::get;
12use axum::{Router, serve};
13use opentelemetry::KeyValue;
14use opentelemetry::metrics::{Counter, Histogram, Meter, MeterProvider, UpDownCounter};
15use opentelemetry_otlp::{MetricExporter, Protocol, WithExportConfig};
16use opentelemetry_sdk::Resource;
17use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
18use tokio::net::TcpListener;
19
20const SERVICE_NAME: &str = "pgroles-operator";
21
22#[derive(Clone)]
23pub struct OperatorObservability {
24    ready: Arc<AtomicBool>,
25    metrics: Option<Arc<Metrics>>,
26}
27
28struct Metrics {
29    provider: SdkMeterProvider,
30    reconcile_total: Counter<u64>,
31    reconcile_duration_ms: Histogram<u64>,
32    reconcile_inflight: UpDownCounter<i64>,
33    inspect_duration_ms: Histogram<u64>,
34    inspect_items_total: Counter<u64>,
35    wildcard_grantability_queries_total: Counter<u64>,
36    wildcard_unsatisfied_grants_total: Counter<u64>,
37    plan_total: Counter<u64>,
38    plan_changes_total: Counter<u64>,
39    lock_contention_total: Counter<u64>,
40    policy_conflicts_total: Counter<u64>,
41    invalid_spec_total: Counter<u64>,
42    database_connection_failures_total: Counter<u64>,
43    apply_total: Counter<u64>,
44    apply_statements_total: Counter<u64>,
45}
46
47pub struct ReconcileGuard {
48    metrics: Option<Arc<Metrics>>,
49    started_at: Instant,
50}
51
52impl OperatorObservability {
53    pub fn from_env() -> anyhow::Result<Self> {
54        Ok(Self {
55            ready: Arc::new(AtomicBool::new(false)),
56            metrics: init_metrics_from_env()?,
57        })
58    }
59
60    pub fn mark_ready(&self) {
61        self.ready.store(true, Ordering::Relaxed);
62    }
63
64    pub fn mark_not_ready(&self) {
65        self.ready.store(false, Ordering::Relaxed);
66    }
67
68    pub fn start_reconcile(&self) -> ReconcileGuard {
69        if let Some(metrics) = &self.metrics {
70            metrics.reconcile_inflight.add(1, &[]);
71            ReconcileGuard {
72                metrics: Some(metrics.clone()),
73                started_at: Instant::now(),
74            }
75        } else {
76            ReconcileGuard {
77                metrics: None,
78                started_at: Instant::now(),
79            }
80        }
81    }
82
83    pub fn record_database_connection_failure(&self) {
84        if let Some(metrics) = &self.metrics {
85            metrics.database_connection_failures_total.add(1, &[]);
86        }
87    }
88
89    pub fn record_policy_conflict(&self) {
90        if let Some(metrics) = &self.metrics {
91            metrics.policy_conflicts_total.add(1, &[]);
92        }
93    }
94
95    pub fn record_lock_contention(&self) {
96        if let Some(metrics) = &self.metrics {
97            metrics.lock_contention_total.add(1, &[]);
98        }
99    }
100
101    pub fn record_plan_result(&self, result: &str) {
102        if let Some(metrics) = &self.metrics {
103            metrics
104                .plan_total
105                .add(1, &[KeyValue::new("result", result.to_string())]);
106        }
107    }
108
109    pub fn record_planned_changes(&self, changes: usize) {
110        if changes == 0 {
111            return;
112        }
113        if let Some(metrics) = &self.metrics {
114            metrics.plan_changes_total.add(changes as u64, &[]);
115        }
116    }
117
118    pub fn record_invalid_spec(&self) {
119        if let Some(metrics) = &self.metrics {
120            metrics.invalid_spec_total.add(1, &[]);
121        }
122    }
123
124    pub fn record_inspection(&self, stats: &pgroles_inspect::InspectionStats) {
125        let Some(metrics) = &self.metrics else {
126            return;
127        };
128
129        for (phase, duration) in &stats.phase_durations {
130            metrics.inspect_duration_ms.record(
131                duration.as_millis() as u64,
132                &[KeyValue::new("phase", *phase)],
133            );
134        }
135
136        for (kind, count) in [
137            ("roles", stats.roles),
138            ("memberships", stats.memberships),
139            ("schemas", stats.schemas),
140            ("grants", stats.grants),
141            ("default_privileges", stats.default_privileges),
142            (
143                "wildcard_configured_grants",
144                stats.wildcard.configured_grants,
145            ),
146            (
147                "wildcard_configured_scopes",
148                stats.wildcard.configured_scopes,
149            ),
150            (
151                "wildcard_inventory_objects",
152                stats.wildcard.inventory_objects,
153            ),
154            (
155                "wildcard_unsatisfied_scopes",
156                stats.wildcard.unsatisfied_scopes,
157            ),
158            (
159                "wildcard_grantability_objects",
160                stats.wildcard.grantability_objects,
161            ),
162        ] {
163            if count > 0 {
164                metrics
165                    .inspect_items_total
166                    .add(count as u64, &[KeyValue::new("kind", kind)]);
167            }
168        }
169
170        if stats.wildcard.grantability_queries > 0 {
171            metrics
172                .wildcard_grantability_queries_total
173                .add(stats.wildcard.grantability_queries as u64, &[]);
174        }
175        if stats.wildcard.unsatisfied_grants > 0 {
176            metrics
177                .wildcard_unsatisfied_grants_total
178                .add(stats.wildcard.unsatisfied_grants as u64, &[]);
179        }
180    }
181
182    pub fn record_apply_result(&self, result: &str) {
183        if let Some(metrics) = &self.metrics {
184            metrics
185                .apply_total
186                .add(1, &[KeyValue::new("result", result.to_string())]);
187        }
188    }
189
190    pub fn record_apply_statements(&self, statements: usize) {
191        if statements == 0 {
192            return;
193        }
194        if let Some(metrics) = &self.metrics {
195            metrics.apply_statements_total.add(statements as u64, &[]);
196        }
197    }
198
199    pub fn shutdown(&self) -> anyhow::Result<()> {
200        if let Some(metrics) = &self.metrics {
201            metrics.provider.shutdown()?;
202        }
203        Ok(())
204    }
205}
206
207impl ReconcileGuard {
208    pub fn record_result(self, result: &str, reason: &str) {
209        if let Some(metrics) = &self.metrics {
210            metrics.reconcile_total.add(
211                1,
212                &[
213                    KeyValue::new("result", result.to_string()),
214                    KeyValue::new("reason", reason.to_string()),
215                ],
216            );
217            metrics
218                .reconcile_duration_ms
219                .record(self.started_at.elapsed().as_millis() as u64, &[]);
220        }
221    }
222}
223
224impl Drop for ReconcileGuard {
225    fn drop(&mut self) {
226        if let Some(metrics) = &self.metrics {
227            metrics.reconcile_inflight.add(-1, &[]);
228        }
229    }
230}
231
232pub async fn serve_health(
233    bind_addr: SocketAddr,
234    observability: OperatorObservability,
235) -> anyhow::Result<()> {
236    let listener = TcpListener::bind(bind_addr).await?;
237    let app = Router::new()
238        .route("/livez", get(livez))
239        .route("/readyz", get(readyz))
240        .with_state(observability);
241
242    serve(listener, app).await?;
243    Ok(())
244}
245
246fn init_metrics_from_env() -> anyhow::Result<Option<Arc<Metrics>>> {
247    if !otel_metrics_enabled() {
248        return Ok(None);
249    }
250
251    let exporter = MetricExporter::builder()
252        .with_tonic()
253        .with_protocol(Protocol::Grpc)
254        .build()?;
255
256    let reader = PeriodicReader::builder(exporter).build();
257    let provider = SdkMeterProvider::builder()
258        .with_reader(reader)
259        .with_resource(
260            Resource::builder_empty()
261                .with_attributes([
262                    KeyValue::new("service.name", SERVICE_NAME),
263                    KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
264                ])
265                .build(),
266        )
267        .build();
268
269    let meter = provider.meter(SERVICE_NAME);
270    Ok(Some(Arc::new(Metrics::new(provider, meter))))
271}
272
273fn otel_metrics_enabled() -> bool {
274    let metrics_exporter = std::env::var("OTEL_METRICS_EXPORTER").ok();
275    if matches!(metrics_exporter.as_deref(), Some("none")) {
276        return false;
277    }
278
279    std::env::var_os("OTEL_EXPORTER_OTLP_ENDPOINT").is_some()
280        || std::env::var_os("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT").is_some()
281}
282
283impl Metrics {
284    fn new(provider: SdkMeterProvider, meter: Meter) -> Self {
285        Self {
286            provider,
287            reconcile_total: meter
288                .u64_counter("pgroles.reconcile.total")
289                .with_description("Total reconciliations by result and reason")
290                .build(),
291            reconcile_duration_ms: meter
292                .u64_histogram("pgroles.reconcile.duration")
293                .with_unit("ms")
294                .with_description("Reconciliation duration in milliseconds")
295                .build(),
296            reconcile_inflight: meter
297                .i64_up_down_counter("pgroles.reconcile.inflight")
298                .with_description("In-flight reconciliations")
299                .build(),
300            inspect_duration_ms: meter
301                .u64_histogram("pgroles.inspect.duration")
302                .with_unit("ms")
303                .with_description("Database inspection phase duration in milliseconds")
304                .build(),
305            inspect_items_total: meter
306                .u64_counter("pgroles.inspect.items")
307                .with_description("Database inspection objects observed by kind")
308                .build(),
309            wildcard_grantability_queries_total: meter
310                .u64_counter("pgroles.wildcard.grantability_queries")
311                .with_description("Wildcard grantability catalog queries")
312                .build(),
313            wildcard_unsatisfied_grants_total: meter
314                .u64_counter("pgroles.wildcard.unsatisfied_grants")
315                .with_description("Wildcard grants missing privileges before grantability checks")
316                .build(),
317            plan_total: meter
318                .u64_counter("pgroles.plan.total")
319                .with_description("Successful plan-mode reconciliations by result")
320                .build(),
321            plan_changes_total: meter
322                .u64_counter("pgroles.plan.changes")
323                .with_description("Planned changes discovered during plan-mode reconciliations")
324                .build(),
325            lock_contention_total: meter
326                .u64_counter("pgroles.lock_contention.total")
327                .with_description("Reconciliations delayed by per-database lock contention")
328                .build(),
329            policy_conflicts_total: meter
330                .u64_counter("pgroles.policy.conflicts")
331                .with_description("Conflicting policies targeting the same database")
332                .build(),
333            invalid_spec_total: meter
334                .u64_counter("pgroles.invalid_spec.total")
335                .with_description("Invalid PostgresPolicy specifications")
336                .build(),
337            database_connection_failures_total: meter
338                .u64_counter("pgroles.database.connection_failures")
339                .with_description("Database connection failures during reconciliation")
340                .build(),
341            apply_total: meter
342                .u64_counter("pgroles.apply.total")
343                .with_description("Apply transaction outcomes")
344                .build(),
345            apply_statements_total: meter
346                .u64_counter("pgroles.apply.statements")
347                .with_description("SQL statements executed during successful applies")
348                .build(),
349        }
350    }
351}
352
353async fn livez() -> &'static str {
354    "ok"
355}
356
357async fn readyz(State(observability): State<OperatorObservability>) -> impl IntoResponse {
358    if observability.ready.load(Ordering::Relaxed) {
359        (StatusCode::OK, "ready")
360    } else {
361        (StatusCode::SERVICE_UNAVAILABLE, "not ready")
362    }
363}
364
365#[cfg(test)]
366mod tests {
367    use std::sync::{Arc, Mutex};
368    use std::time::Duration;
369
370    use axum::extract::State;
371    use axum::http::StatusCode;
372    use axum::response::IntoResponse;
373    use opentelemetry::metrics::MeterProvider;
374    use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData, ResourceMetrics};
375    use opentelemetry_sdk::metrics::{InMemoryMetricExporter, PeriodicReader, SdkMeterProvider};
376
377    use super::{
378        Metrics, OperatorObservability, ReconcileGuard, SERVICE_NAME, livez, otel_metrics_enabled,
379        readyz,
380    };
381
382    static ENV_LOCK: Mutex<()> = Mutex::new(());
383
384    fn test_observability() -> (
385        OperatorObservability,
386        SdkMeterProvider,
387        InMemoryMetricExporter,
388    ) {
389        let exporter = InMemoryMetricExporter::default();
390        let provider = SdkMeterProvider::builder()
391            .with_reader(PeriodicReader::builder(exporter.clone()).build())
392            .build();
393        let meter = provider.meter(SERVICE_NAME);
394        let observability = OperatorObservability {
395            ready: Arc::new(std::sync::atomic::AtomicBool::new(false)),
396            metrics: Some(Arc::new(Metrics::new(provider.clone(), meter))),
397        };
398
399        (observability, provider, exporter)
400    }
401
402    fn metric_exists(metrics: &[ResourceMetrics], name: &str) -> bool {
403        metrics.iter().any(|resource_metrics| {
404            resource_metrics
405                .scope_metrics()
406                .flat_map(|scope_metrics| scope_metrics.metrics())
407                .any(|metric| metric.name() == name)
408        })
409    }
410
411    fn u64_sum_value(metrics: &[ResourceMetrics], name: &str) -> Option<u64> {
412        let mut found = false;
413        let total = metrics
414            .iter()
415            .flat_map(|resource_metrics| resource_metrics.scope_metrics())
416            .flat_map(|scope_metrics| scope_metrics.metrics())
417            .filter(|metric| metric.name() == name)
418            .filter_map(|metric| match metric.data() {
419                AggregatedMetrics::U64(MetricData::Sum(sum)) => {
420                    found = true;
421                    Some(
422                        sum.data_points()
423                            .map(|data_point| data_point.value())
424                            .sum::<u64>(),
425                    )
426                }
427                _ => None,
428            })
429            .sum();
430
431        found.then_some(total)
432    }
433
434    fn i64_sum_value(metrics: &[ResourceMetrics], name: &str) -> Option<i64> {
435        metrics
436            .iter()
437            .flat_map(|resource_metrics| resource_metrics.scope_metrics())
438            .flat_map(|scope_metrics| scope_metrics.metrics())
439            .find(|metric| metric.name() == name)
440            .and_then(|metric| match metric.data() {
441                AggregatedMetrics::I64(MetricData::Sum(sum)) => sum
442                    .data_points()
443                    .next()
444                    .map(|data_point| data_point.value()),
445                _ => None,
446            })
447    }
448
449    #[test]
450    fn otel_metrics_stay_disabled_without_endpoint() {
451        let _guard = ENV_LOCK.lock().expect("env lock should not be poisoned");
452        unsafe {
453            std::env::remove_var("OTEL_EXPORTER_OTLP_ENDPOINT");
454            std::env::remove_var("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT");
455            std::env::remove_var("OTEL_METRICS_EXPORTER");
456        }
457        assert!(!otel_metrics_enabled());
458    }
459
460    #[test]
461    fn otel_metrics_enable_with_explicit_endpoint() {
462        let _guard = ENV_LOCK.lock().expect("env lock should not be poisoned");
463        unsafe {
464            std::env::set_var("OTEL_EXPORTER_OTLP_ENDPOINT", "http://collector:4317");
465            std::env::remove_var("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT");
466            std::env::remove_var("OTEL_METRICS_EXPORTER");
467        }
468        assert!(otel_metrics_enabled());
469        unsafe {
470            std::env::remove_var("OTEL_EXPORTER_OTLP_ENDPOINT");
471        }
472    }
473
474    #[tokio::test]
475    async fn health_endpoints_reflect_readiness() {
476        let (observability, _provider, _exporter) = test_observability();
477
478        assert_eq!(livez().await, "ok");
479
480        let not_ready = readyz(State(observability.clone())).await.into_response();
481        assert_eq!(not_ready.status(), StatusCode::SERVICE_UNAVAILABLE);
482
483        observability.mark_ready();
484        let ready = readyz(State(observability)).await.into_response();
485        assert_eq!(ready.status(), StatusCode::OK);
486    }
487
488    #[test]
489    fn metrics_are_recorded_and_flushed() {
490        let (observability, provider, exporter) = test_observability();
491
492        let guard: ReconcileGuard = observability.start_reconcile();
493        observability.record_lock_contention();
494        observability.record_policy_conflict();
495        observability.record_invalid_spec();
496        observability.record_database_connection_failure();
497        observability.record_inspection(&pgroles_inspect::InspectionStats {
498            roles: 3,
499            memberships: 2,
500            schemas: 1,
501            grants: 5,
502            default_privileges: 1,
503            phase_durations: [
504                ("roles", Duration::from_millis(4)),
505                ("object_privileges", Duration::from_millis(12)),
506            ]
507            .into_iter()
508            .collect(),
509            wildcard: pgroles_inspect::WildcardInspectionStats {
510                configured_grants: 2,
511                configured_scopes: 1,
512                inventory_objects: 100,
513                unsatisfied_grants: 1,
514                unsatisfied_scopes: 1,
515                grantability_queries: 1,
516                grantability_objects: 3,
517            },
518        });
519        observability.record_plan_result("drift");
520        observability.record_planned_changes(2);
521        observability.record_apply_result("success");
522        observability.record_apply_statements(4);
523        guard.record_result("conflict", "ConflictingPolicy");
524
525        provider.force_flush().expect("flush should succeed");
526
527        let metrics = exporter
528            .get_finished_metrics()
529            .expect("metrics should be exported");
530
531        assert!(metric_exists(&metrics, "pgroles.reconcile.total"));
532        assert!(metric_exists(&metrics, "pgroles.reconcile.duration"));
533        assert!(metric_exists(&metrics, "pgroles.inspect.duration"));
534        assert_eq!(u64_sum_value(&metrics, "pgroles.inspect.items"), Some(119));
535        assert_eq!(
536            u64_sum_value(&metrics, "pgroles.wildcard.grantability_queries"),
537            Some(1)
538        );
539        assert_eq!(
540            u64_sum_value(&metrics, "pgroles.wildcard.unsatisfied_grants"),
541            Some(1)
542        );
543        assert_eq!(u64_sum_value(&metrics, "pgroles.plan.total"), Some(1));
544        assert_eq!(u64_sum_value(&metrics, "pgroles.plan.changes"), Some(2));
545        assert_eq!(
546            u64_sum_value(&metrics, "pgroles.lock_contention.total"),
547            Some(1)
548        );
549        assert_eq!(u64_sum_value(&metrics, "pgroles.policy.conflicts"), Some(1));
550        assert_eq!(
551            u64_sum_value(&metrics, "pgroles.invalid_spec.total"),
552            Some(1)
553        );
554        assert_eq!(
555            u64_sum_value(&metrics, "pgroles.database.connection_failures"),
556            Some(1)
557        );
558        assert_eq!(u64_sum_value(&metrics, "pgroles.apply.statements"), Some(4));
559        assert_eq!(
560            i64_sum_value(&metrics, "pgroles.reconcile.inflight"),
561            Some(0)
562        );
563    }
564}