1use std::net::SocketAddr;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::time::Instant;
7
8use axum::extract::State;
9use axum::http::StatusCode;
10use axum::response::IntoResponse;
11use axum::routing::get;
12use axum::{Router, serve};
13use opentelemetry::KeyValue;
14use opentelemetry::metrics::{Counter, Histogram, Meter, MeterProvider, UpDownCounter};
15use opentelemetry_otlp::{MetricExporter, Protocol, WithExportConfig};
16use opentelemetry_sdk::Resource;
17use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
18use tokio::net::TcpListener;
19
20const SERVICE_NAME: &str = "pgroles-operator";
21
22#[derive(Clone)]
23pub struct OperatorObservability {
24 ready: Arc<AtomicBool>,
25 metrics: Option<Arc<Metrics>>,
26}
27
28struct Metrics {
29 provider: SdkMeterProvider,
30 reconcile_total: Counter<u64>,
31 reconcile_duration_ms: Histogram<u64>,
32 reconcile_inflight: UpDownCounter<i64>,
33 inspect_duration_ms: Histogram<u64>,
34 inspect_items_total: Counter<u64>,
35 wildcard_grantability_queries_total: Counter<u64>,
36 wildcard_unsatisfied_grants_total: Counter<u64>,
37 plan_total: Counter<u64>,
38 plan_changes_total: Counter<u64>,
39 lock_contention_total: Counter<u64>,
40 policy_conflicts_total: Counter<u64>,
41 invalid_spec_total: Counter<u64>,
42 database_connection_failures_total: Counter<u64>,
43 apply_total: Counter<u64>,
44 apply_statements_total: Counter<u64>,
45}
46
47pub struct ReconcileGuard {
48 metrics: Option<Arc<Metrics>>,
49 started_at: Instant,
50}
51
52impl OperatorObservability {
53 pub fn from_env() -> anyhow::Result<Self> {
54 Ok(Self {
55 ready: Arc::new(AtomicBool::new(false)),
56 metrics: init_metrics_from_env()?,
57 })
58 }
59
60 pub fn mark_ready(&self) {
61 self.ready.store(true, Ordering::Relaxed);
62 }
63
64 pub fn mark_not_ready(&self) {
65 self.ready.store(false, Ordering::Relaxed);
66 }
67
68 pub fn start_reconcile(&self) -> ReconcileGuard {
69 if let Some(metrics) = &self.metrics {
70 metrics.reconcile_inflight.add(1, &[]);
71 ReconcileGuard {
72 metrics: Some(metrics.clone()),
73 started_at: Instant::now(),
74 }
75 } else {
76 ReconcileGuard {
77 metrics: None,
78 started_at: Instant::now(),
79 }
80 }
81 }
82
83 pub fn record_database_connection_failure(&self) {
84 if let Some(metrics) = &self.metrics {
85 metrics.database_connection_failures_total.add(1, &[]);
86 }
87 }
88
89 pub fn record_policy_conflict(&self) {
90 if let Some(metrics) = &self.metrics {
91 metrics.policy_conflicts_total.add(1, &[]);
92 }
93 }
94
95 pub fn record_lock_contention(&self) {
96 if let Some(metrics) = &self.metrics {
97 metrics.lock_contention_total.add(1, &[]);
98 }
99 }
100
101 pub fn record_plan_result(&self, result: &str) {
102 if let Some(metrics) = &self.metrics {
103 metrics
104 .plan_total
105 .add(1, &[KeyValue::new("result", result.to_string())]);
106 }
107 }
108
109 pub fn record_planned_changes(&self, changes: usize) {
110 if changes == 0 {
111 return;
112 }
113 if let Some(metrics) = &self.metrics {
114 metrics.plan_changes_total.add(changes as u64, &[]);
115 }
116 }
117
118 pub fn record_invalid_spec(&self) {
119 if let Some(metrics) = &self.metrics {
120 metrics.invalid_spec_total.add(1, &[]);
121 }
122 }
123
124 pub fn record_inspection(&self, stats: &pgroles_inspect::InspectionStats) {
125 let Some(metrics) = &self.metrics else {
126 return;
127 };
128
129 for (phase, duration) in &stats.phase_durations {
130 metrics.inspect_duration_ms.record(
131 duration.as_millis() as u64,
132 &[KeyValue::new("phase", *phase)],
133 );
134 }
135
136 for (kind, count) in [
137 ("roles", stats.roles),
138 ("memberships", stats.memberships),
139 ("schemas", stats.schemas),
140 ("grants", stats.grants),
141 ("default_privileges", stats.default_privileges),
142 (
143 "wildcard_configured_grants",
144 stats.wildcard.configured_grants,
145 ),
146 (
147 "wildcard_configured_scopes",
148 stats.wildcard.configured_scopes,
149 ),
150 (
151 "wildcard_inventory_objects",
152 stats.wildcard.inventory_objects,
153 ),
154 (
155 "wildcard_unsatisfied_scopes",
156 stats.wildcard.unsatisfied_scopes,
157 ),
158 (
159 "wildcard_grantability_objects",
160 stats.wildcard.grantability_objects,
161 ),
162 ] {
163 if count > 0 {
164 metrics
165 .inspect_items_total
166 .add(count as u64, &[KeyValue::new("kind", kind)]);
167 }
168 }
169
170 if stats.wildcard.grantability_queries > 0 {
171 metrics
172 .wildcard_grantability_queries_total
173 .add(stats.wildcard.grantability_queries as u64, &[]);
174 }
175 if stats.wildcard.unsatisfied_grants > 0 {
176 metrics
177 .wildcard_unsatisfied_grants_total
178 .add(stats.wildcard.unsatisfied_grants as u64, &[]);
179 }
180 }
181
182 pub fn record_apply_result(&self, result: &str) {
183 if let Some(metrics) = &self.metrics {
184 metrics
185 .apply_total
186 .add(1, &[KeyValue::new("result", result.to_string())]);
187 }
188 }
189
190 pub fn record_apply_statements(&self, statements: usize) {
191 if statements == 0 {
192 return;
193 }
194 if let Some(metrics) = &self.metrics {
195 metrics.apply_statements_total.add(statements as u64, &[]);
196 }
197 }
198
199 pub fn shutdown(&self) -> anyhow::Result<()> {
200 if let Some(metrics) = &self.metrics {
201 metrics.provider.shutdown()?;
202 }
203 Ok(())
204 }
205}
206
207impl ReconcileGuard {
208 pub fn record_result(self, result: &str, reason: &str) {
209 if let Some(metrics) = &self.metrics {
210 metrics.reconcile_total.add(
211 1,
212 &[
213 KeyValue::new("result", result.to_string()),
214 KeyValue::new("reason", reason.to_string()),
215 ],
216 );
217 metrics
218 .reconcile_duration_ms
219 .record(self.started_at.elapsed().as_millis() as u64, &[]);
220 }
221 }
222}
223
224impl Drop for ReconcileGuard {
225 fn drop(&mut self) {
226 if let Some(metrics) = &self.metrics {
227 metrics.reconcile_inflight.add(-1, &[]);
228 }
229 }
230}
231
232pub async fn serve_health(
233 bind_addr: SocketAddr,
234 observability: OperatorObservability,
235) -> anyhow::Result<()> {
236 let listener = TcpListener::bind(bind_addr).await?;
237 let app = Router::new()
238 .route("/livez", get(livez))
239 .route("/readyz", get(readyz))
240 .with_state(observability);
241
242 serve(listener, app).await?;
243 Ok(())
244}
245
246fn init_metrics_from_env() -> anyhow::Result<Option<Arc<Metrics>>> {
247 if !otel_metrics_enabled() {
248 return Ok(None);
249 }
250
251 let exporter = MetricExporter::builder()
252 .with_tonic()
253 .with_protocol(Protocol::Grpc)
254 .build()?;
255
256 let reader = PeriodicReader::builder(exporter).build();
257 let provider = SdkMeterProvider::builder()
258 .with_reader(reader)
259 .with_resource(
260 Resource::builder_empty()
261 .with_attributes([
262 KeyValue::new("service.name", SERVICE_NAME),
263 KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
264 ])
265 .build(),
266 )
267 .build();
268
269 let meter = provider.meter(SERVICE_NAME);
270 Ok(Some(Arc::new(Metrics::new(provider, meter))))
271}
272
273fn otel_metrics_enabled() -> bool {
274 let metrics_exporter = std::env::var("OTEL_METRICS_EXPORTER").ok();
275 if matches!(metrics_exporter.as_deref(), Some("none")) {
276 return false;
277 }
278
279 std::env::var_os("OTEL_EXPORTER_OTLP_ENDPOINT").is_some()
280 || std::env::var_os("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT").is_some()
281}
282
283impl Metrics {
284 fn new(provider: SdkMeterProvider, meter: Meter) -> Self {
285 Self {
286 provider,
287 reconcile_total: meter
288 .u64_counter("pgroles.reconcile.total")
289 .with_description("Total reconciliations by result and reason")
290 .build(),
291 reconcile_duration_ms: meter
292 .u64_histogram("pgroles.reconcile.duration")
293 .with_unit("ms")
294 .with_description("Reconciliation duration in milliseconds")
295 .build(),
296 reconcile_inflight: meter
297 .i64_up_down_counter("pgroles.reconcile.inflight")
298 .with_description("In-flight reconciliations")
299 .build(),
300 inspect_duration_ms: meter
301 .u64_histogram("pgroles.inspect.duration")
302 .with_unit("ms")
303 .with_description("Database inspection phase duration in milliseconds")
304 .build(),
305 inspect_items_total: meter
306 .u64_counter("pgroles.inspect.items")
307 .with_description("Database inspection objects observed by kind")
308 .build(),
309 wildcard_grantability_queries_total: meter
310 .u64_counter("pgroles.wildcard.grantability_queries")
311 .with_description("Wildcard grantability catalog queries")
312 .build(),
313 wildcard_unsatisfied_grants_total: meter
314 .u64_counter("pgroles.wildcard.unsatisfied_grants")
315 .with_description("Wildcard grants missing privileges before grantability checks")
316 .build(),
317 plan_total: meter
318 .u64_counter("pgroles.plan.total")
319 .with_description("Successful plan-mode reconciliations by result")
320 .build(),
321 plan_changes_total: meter
322 .u64_counter("pgroles.plan.changes")
323 .with_description("Planned changes discovered during plan-mode reconciliations")
324 .build(),
325 lock_contention_total: meter
326 .u64_counter("pgroles.lock_contention.total")
327 .with_description("Reconciliations delayed by per-database lock contention")
328 .build(),
329 policy_conflicts_total: meter
330 .u64_counter("pgroles.policy.conflicts")
331 .with_description("Conflicting policies targeting the same database")
332 .build(),
333 invalid_spec_total: meter
334 .u64_counter("pgroles.invalid_spec.total")
335 .with_description("Invalid PostgresPolicy specifications")
336 .build(),
337 database_connection_failures_total: meter
338 .u64_counter("pgroles.database.connection_failures")
339 .with_description("Database connection failures during reconciliation")
340 .build(),
341 apply_total: meter
342 .u64_counter("pgroles.apply.total")
343 .with_description("Apply transaction outcomes")
344 .build(),
345 apply_statements_total: meter
346 .u64_counter("pgroles.apply.statements")
347 .with_description("SQL statements executed during successful applies")
348 .build(),
349 }
350 }
351}
352
353async fn livez() -> &'static str {
354 "ok"
355}
356
357async fn readyz(State(observability): State<OperatorObservability>) -> impl IntoResponse {
358 if observability.ready.load(Ordering::Relaxed) {
359 (StatusCode::OK, "ready")
360 } else {
361 (StatusCode::SERVICE_UNAVAILABLE, "not ready")
362 }
363}
364
365#[cfg(test)]
366mod tests {
367 use std::sync::{Arc, Mutex};
368 use std::time::Duration;
369
370 use axum::extract::State;
371 use axum::http::StatusCode;
372 use axum::response::IntoResponse;
373 use opentelemetry::metrics::MeterProvider;
374 use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData, ResourceMetrics};
375 use opentelemetry_sdk::metrics::{InMemoryMetricExporter, PeriodicReader, SdkMeterProvider};
376
377 use super::{
378 Metrics, OperatorObservability, ReconcileGuard, SERVICE_NAME, livez, otel_metrics_enabled,
379 readyz,
380 };
381
382 static ENV_LOCK: Mutex<()> = Mutex::new(());
383
384 fn test_observability() -> (
385 OperatorObservability,
386 SdkMeterProvider,
387 InMemoryMetricExporter,
388 ) {
389 let exporter = InMemoryMetricExporter::default();
390 let provider = SdkMeterProvider::builder()
391 .with_reader(PeriodicReader::builder(exporter.clone()).build())
392 .build();
393 let meter = provider.meter(SERVICE_NAME);
394 let observability = OperatorObservability {
395 ready: Arc::new(std::sync::atomic::AtomicBool::new(false)),
396 metrics: Some(Arc::new(Metrics::new(provider.clone(), meter))),
397 };
398
399 (observability, provider, exporter)
400 }
401
402 fn metric_exists(metrics: &[ResourceMetrics], name: &str) -> bool {
403 metrics.iter().any(|resource_metrics| {
404 resource_metrics
405 .scope_metrics()
406 .flat_map(|scope_metrics| scope_metrics.metrics())
407 .any(|metric| metric.name() == name)
408 })
409 }
410
411 fn u64_sum_value(metrics: &[ResourceMetrics], name: &str) -> Option<u64> {
412 let mut found = false;
413 let total = metrics
414 .iter()
415 .flat_map(|resource_metrics| resource_metrics.scope_metrics())
416 .flat_map(|scope_metrics| scope_metrics.metrics())
417 .filter(|metric| metric.name() == name)
418 .filter_map(|metric| match metric.data() {
419 AggregatedMetrics::U64(MetricData::Sum(sum)) => {
420 found = true;
421 Some(
422 sum.data_points()
423 .map(|data_point| data_point.value())
424 .sum::<u64>(),
425 )
426 }
427 _ => None,
428 })
429 .sum();
430
431 found.then_some(total)
432 }
433
434 fn i64_sum_value(metrics: &[ResourceMetrics], name: &str) -> Option<i64> {
435 metrics
436 .iter()
437 .flat_map(|resource_metrics| resource_metrics.scope_metrics())
438 .flat_map(|scope_metrics| scope_metrics.metrics())
439 .find(|metric| metric.name() == name)
440 .and_then(|metric| match metric.data() {
441 AggregatedMetrics::I64(MetricData::Sum(sum)) => sum
442 .data_points()
443 .next()
444 .map(|data_point| data_point.value()),
445 _ => None,
446 })
447 }
448
449 #[test]
450 fn otel_metrics_stay_disabled_without_endpoint() {
451 let _guard = ENV_LOCK.lock().expect("env lock should not be poisoned");
452 unsafe {
453 std::env::remove_var("OTEL_EXPORTER_OTLP_ENDPOINT");
454 std::env::remove_var("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT");
455 std::env::remove_var("OTEL_METRICS_EXPORTER");
456 }
457 assert!(!otel_metrics_enabled());
458 }
459
460 #[test]
461 fn otel_metrics_enable_with_explicit_endpoint() {
462 let _guard = ENV_LOCK.lock().expect("env lock should not be poisoned");
463 unsafe {
464 std::env::set_var("OTEL_EXPORTER_OTLP_ENDPOINT", "http://collector:4317");
465 std::env::remove_var("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT");
466 std::env::remove_var("OTEL_METRICS_EXPORTER");
467 }
468 assert!(otel_metrics_enabled());
469 unsafe {
470 std::env::remove_var("OTEL_EXPORTER_OTLP_ENDPOINT");
471 }
472 }
473
474 #[tokio::test]
475 async fn health_endpoints_reflect_readiness() {
476 let (observability, _provider, _exporter) = test_observability();
477
478 assert_eq!(livez().await, "ok");
479
480 let not_ready = readyz(State(observability.clone())).await.into_response();
481 assert_eq!(not_ready.status(), StatusCode::SERVICE_UNAVAILABLE);
482
483 observability.mark_ready();
484 let ready = readyz(State(observability)).await.into_response();
485 assert_eq!(ready.status(), StatusCode::OK);
486 }
487
488 #[test]
489 fn metrics_are_recorded_and_flushed() {
490 let (observability, provider, exporter) = test_observability();
491
492 let guard: ReconcileGuard = observability.start_reconcile();
493 observability.record_lock_contention();
494 observability.record_policy_conflict();
495 observability.record_invalid_spec();
496 observability.record_database_connection_failure();
497 observability.record_inspection(&pgroles_inspect::InspectionStats {
498 roles: 3,
499 memberships: 2,
500 schemas: 1,
501 grants: 5,
502 default_privileges: 1,
503 phase_durations: [
504 ("roles", Duration::from_millis(4)),
505 ("object_privileges", Duration::from_millis(12)),
506 ]
507 .into_iter()
508 .collect(),
509 wildcard: pgroles_inspect::WildcardInspectionStats {
510 configured_grants: 2,
511 configured_scopes: 1,
512 inventory_objects: 100,
513 unsatisfied_grants: 1,
514 unsatisfied_scopes: 1,
515 grantability_queries: 1,
516 grantability_objects: 3,
517 },
518 });
519 observability.record_plan_result("drift");
520 observability.record_planned_changes(2);
521 observability.record_apply_result("success");
522 observability.record_apply_statements(4);
523 guard.record_result("conflict", "ConflictingPolicy");
524
525 provider.force_flush().expect("flush should succeed");
526
527 let metrics = exporter
528 .get_finished_metrics()
529 .expect("metrics should be exported");
530
531 assert!(metric_exists(&metrics, "pgroles.reconcile.total"));
532 assert!(metric_exists(&metrics, "pgroles.reconcile.duration"));
533 assert!(metric_exists(&metrics, "pgroles.inspect.duration"));
534 assert_eq!(u64_sum_value(&metrics, "pgroles.inspect.items"), Some(119));
535 assert_eq!(
536 u64_sum_value(&metrics, "pgroles.wildcard.grantability_queries"),
537 Some(1)
538 );
539 assert_eq!(
540 u64_sum_value(&metrics, "pgroles.wildcard.unsatisfied_grants"),
541 Some(1)
542 );
543 assert_eq!(u64_sum_value(&metrics, "pgroles.plan.total"), Some(1));
544 assert_eq!(u64_sum_value(&metrics, "pgroles.plan.changes"), Some(2));
545 assert_eq!(
546 u64_sum_value(&metrics, "pgroles.lock_contention.total"),
547 Some(1)
548 );
549 assert_eq!(u64_sum_value(&metrics, "pgroles.policy.conflicts"), Some(1));
550 assert_eq!(
551 u64_sum_value(&metrics, "pgroles.invalid_spec.total"),
552 Some(1)
553 );
554 assert_eq!(
555 u64_sum_value(&metrics, "pgroles.database.connection_failures"),
556 Some(1)
557 );
558 assert_eq!(u64_sum_value(&metrics, "pgroles.apply.statements"), Some(4));
559 assert_eq!(
560 i64_sum_value(&metrics, "pgroles.reconcile.inflight"),
561 Some(0)
562 );
563 }
564}