1use std::net::SocketAddr;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::time::Instant;
7
8use axum::extract::State;
9use axum::http::StatusCode;
10use axum::response::IntoResponse;
11use axum::routing::get;
12use axum::{Router, serve};
13use opentelemetry::KeyValue;
14use opentelemetry::metrics::{Counter, Histogram, Meter, MeterProvider, UpDownCounter};
15use opentelemetry_otlp::{MetricExporter, Protocol, WithExportConfig};
16use opentelemetry_sdk::Resource;
17use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
18use tokio::net::TcpListener;
19
20const SERVICE_NAME: &str = "pgroles-operator";
21
22#[derive(Clone)]
23pub struct OperatorObservability {
24 ready: Arc<AtomicBool>,
25 metrics: Option<Arc<Metrics>>,
26}
27
28struct Metrics {
29 provider: SdkMeterProvider,
30 reconcile_total: Counter<u64>,
31 reconcile_duration_ms: Histogram<u64>,
32 reconcile_inflight: UpDownCounter<i64>,
33 plan_total: Counter<u64>,
34 plan_changes_total: Counter<u64>,
35 lock_contention_total: Counter<u64>,
36 policy_conflicts_total: Counter<u64>,
37 invalid_spec_total: Counter<u64>,
38 database_connection_failures_total: Counter<u64>,
39 apply_total: Counter<u64>,
40 apply_statements_total: Counter<u64>,
41}
42
43pub struct ReconcileGuard {
44 metrics: Option<Arc<Metrics>>,
45 started_at: Instant,
46}
47
48impl OperatorObservability {
49 pub fn from_env() -> anyhow::Result<Self> {
50 Ok(Self {
51 ready: Arc::new(AtomicBool::new(false)),
52 metrics: init_metrics_from_env()?,
53 })
54 }
55
56 pub fn mark_ready(&self) {
57 self.ready.store(true, Ordering::Relaxed);
58 }
59
60 pub fn mark_not_ready(&self) {
61 self.ready.store(false, Ordering::Relaxed);
62 }
63
64 pub fn start_reconcile(&self) -> ReconcileGuard {
65 if let Some(metrics) = &self.metrics {
66 metrics.reconcile_inflight.add(1, &[]);
67 ReconcileGuard {
68 metrics: Some(metrics.clone()),
69 started_at: Instant::now(),
70 }
71 } else {
72 ReconcileGuard {
73 metrics: None,
74 started_at: Instant::now(),
75 }
76 }
77 }
78
79 pub fn record_database_connection_failure(&self) {
80 if let Some(metrics) = &self.metrics {
81 metrics.database_connection_failures_total.add(1, &[]);
82 }
83 }
84
85 pub fn record_policy_conflict(&self) {
86 if let Some(metrics) = &self.metrics {
87 metrics.policy_conflicts_total.add(1, &[]);
88 }
89 }
90
91 pub fn record_lock_contention(&self) {
92 if let Some(metrics) = &self.metrics {
93 metrics.lock_contention_total.add(1, &[]);
94 }
95 }
96
97 pub fn record_plan_result(&self, result: &str) {
98 if let Some(metrics) = &self.metrics {
99 metrics
100 .plan_total
101 .add(1, &[KeyValue::new("result", result.to_string())]);
102 }
103 }
104
105 pub fn record_planned_changes(&self, changes: usize) {
106 if changes == 0 {
107 return;
108 }
109 if let Some(metrics) = &self.metrics {
110 metrics.plan_changes_total.add(changes as u64, &[]);
111 }
112 }
113
114 pub fn record_invalid_spec(&self) {
115 if let Some(metrics) = &self.metrics {
116 metrics.invalid_spec_total.add(1, &[]);
117 }
118 }
119
120 pub fn record_apply_result(&self, result: &str) {
121 if let Some(metrics) = &self.metrics {
122 metrics
123 .apply_total
124 .add(1, &[KeyValue::new("result", result.to_string())]);
125 }
126 }
127
128 pub fn record_apply_statements(&self, statements: usize) {
129 if statements == 0 {
130 return;
131 }
132 if let Some(metrics) = &self.metrics {
133 metrics.apply_statements_total.add(statements as u64, &[]);
134 }
135 }
136
137 pub fn shutdown(&self) -> anyhow::Result<()> {
138 if let Some(metrics) = &self.metrics {
139 metrics.provider.shutdown()?;
140 }
141 Ok(())
142 }
143}
144
145impl ReconcileGuard {
146 pub fn record_result(self, result: &str, reason: &str) {
147 if let Some(metrics) = &self.metrics {
148 metrics.reconcile_total.add(
149 1,
150 &[
151 KeyValue::new("result", result.to_string()),
152 KeyValue::new("reason", reason.to_string()),
153 ],
154 );
155 metrics
156 .reconcile_duration_ms
157 .record(self.started_at.elapsed().as_millis() as u64, &[]);
158 }
159 }
160}
161
162impl Drop for ReconcileGuard {
163 fn drop(&mut self) {
164 if let Some(metrics) = &self.metrics {
165 metrics.reconcile_inflight.add(-1, &[]);
166 }
167 }
168}
169
170pub async fn serve_health(
171 bind_addr: SocketAddr,
172 observability: OperatorObservability,
173) -> anyhow::Result<()> {
174 let listener = TcpListener::bind(bind_addr).await?;
175 let app = Router::new()
176 .route("/livez", get(livez))
177 .route("/readyz", get(readyz))
178 .with_state(observability);
179
180 serve(listener, app).await?;
181 Ok(())
182}
183
184fn init_metrics_from_env() -> anyhow::Result<Option<Arc<Metrics>>> {
185 if !otel_metrics_enabled() {
186 return Ok(None);
187 }
188
189 let exporter = MetricExporter::builder()
190 .with_tonic()
191 .with_protocol(Protocol::Grpc)
192 .build()?;
193
194 let reader = PeriodicReader::builder(exporter).build();
195 let provider = SdkMeterProvider::builder()
196 .with_reader(reader)
197 .with_resource(
198 Resource::builder_empty()
199 .with_attributes([
200 KeyValue::new("service.name", SERVICE_NAME),
201 KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
202 ])
203 .build(),
204 )
205 .build();
206
207 let meter = provider.meter(SERVICE_NAME);
208 Ok(Some(Arc::new(Metrics::new(provider, meter))))
209}
210
211fn otel_metrics_enabled() -> bool {
212 let metrics_exporter = std::env::var("OTEL_METRICS_EXPORTER").ok();
213 if matches!(metrics_exporter.as_deref(), Some("none")) {
214 return false;
215 }
216
217 std::env::var_os("OTEL_EXPORTER_OTLP_ENDPOINT").is_some()
218 || std::env::var_os("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT").is_some()
219}
220
221impl Metrics {
222 fn new(provider: SdkMeterProvider, meter: Meter) -> Self {
223 Self {
224 provider,
225 reconcile_total: meter
226 .u64_counter("pgroles.reconcile.total")
227 .with_description("Total reconciliations by result and reason")
228 .build(),
229 reconcile_duration_ms: meter
230 .u64_histogram("pgroles.reconcile.duration")
231 .with_unit("ms")
232 .with_description("Reconciliation duration in milliseconds")
233 .build(),
234 reconcile_inflight: meter
235 .i64_up_down_counter("pgroles.reconcile.inflight")
236 .with_description("In-flight reconciliations")
237 .build(),
238 plan_total: meter
239 .u64_counter("pgroles.plan.total")
240 .with_description("Successful plan-mode reconciliations by result")
241 .build(),
242 plan_changes_total: meter
243 .u64_counter("pgroles.plan.changes")
244 .with_description("Planned changes discovered during plan-mode reconciliations")
245 .build(),
246 lock_contention_total: meter
247 .u64_counter("pgroles.lock_contention.total")
248 .with_description("Reconciliations delayed by per-database lock contention")
249 .build(),
250 policy_conflicts_total: meter
251 .u64_counter("pgroles.policy.conflicts")
252 .with_description("Conflicting policies targeting the same database")
253 .build(),
254 invalid_spec_total: meter
255 .u64_counter("pgroles.invalid_spec.total")
256 .with_description("Invalid PostgresPolicy specifications")
257 .build(),
258 database_connection_failures_total: meter
259 .u64_counter("pgroles.database.connection_failures")
260 .with_description("Database connection failures during reconciliation")
261 .build(),
262 apply_total: meter
263 .u64_counter("pgroles.apply.total")
264 .with_description("Apply transaction outcomes")
265 .build(),
266 apply_statements_total: meter
267 .u64_counter("pgroles.apply.statements")
268 .with_description("SQL statements executed during successful applies")
269 .build(),
270 }
271 }
272}
273
274async fn livez() -> &'static str {
275 "ok"
276}
277
278async fn readyz(State(observability): State<OperatorObservability>) -> impl IntoResponse {
279 if observability.ready.load(Ordering::Relaxed) {
280 (StatusCode::OK, "ready")
281 } else {
282 (StatusCode::SERVICE_UNAVAILABLE, "not ready")
283 }
284}
285
286#[cfg(test)]
287mod tests {
288 use std::sync::{Arc, Mutex};
289
290 use axum::extract::State;
291 use axum::http::StatusCode;
292 use axum::response::IntoResponse;
293 use opentelemetry::metrics::MeterProvider;
294 use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData, ResourceMetrics};
295 use opentelemetry_sdk::metrics::{InMemoryMetricExporter, PeriodicReader, SdkMeterProvider};
296
297 use super::{
298 Metrics, OperatorObservability, ReconcileGuard, SERVICE_NAME, livez, otel_metrics_enabled,
299 readyz,
300 };
301
302 static ENV_LOCK: Mutex<()> = Mutex::new(());
303
304 fn test_observability() -> (
305 OperatorObservability,
306 SdkMeterProvider,
307 InMemoryMetricExporter,
308 ) {
309 let exporter = InMemoryMetricExporter::default();
310 let provider = SdkMeterProvider::builder()
311 .with_reader(PeriodicReader::builder(exporter.clone()).build())
312 .build();
313 let meter = provider.meter(SERVICE_NAME);
314 let observability = OperatorObservability {
315 ready: Arc::new(std::sync::atomic::AtomicBool::new(false)),
316 metrics: Some(Arc::new(Metrics::new(provider.clone(), meter))),
317 };
318
319 (observability, provider, exporter)
320 }
321
322 fn metric_exists(metrics: &[ResourceMetrics], name: &str) -> bool {
323 metrics.iter().any(|resource_metrics| {
324 resource_metrics
325 .scope_metrics()
326 .flat_map(|scope_metrics| scope_metrics.metrics())
327 .any(|metric| metric.name() == name)
328 })
329 }
330
331 fn u64_sum_value(metrics: &[ResourceMetrics], name: &str) -> Option<u64> {
332 metrics
333 .iter()
334 .flat_map(|resource_metrics| resource_metrics.scope_metrics())
335 .flat_map(|scope_metrics| scope_metrics.metrics())
336 .find(|metric| metric.name() == name)
337 .and_then(|metric| match metric.data() {
338 AggregatedMetrics::U64(MetricData::Sum(sum)) => sum
339 .data_points()
340 .next()
341 .map(|data_point| data_point.value()),
342 _ => None,
343 })
344 }
345
346 fn i64_sum_value(metrics: &[ResourceMetrics], name: &str) -> Option<i64> {
347 metrics
348 .iter()
349 .flat_map(|resource_metrics| resource_metrics.scope_metrics())
350 .flat_map(|scope_metrics| scope_metrics.metrics())
351 .find(|metric| metric.name() == name)
352 .and_then(|metric| match metric.data() {
353 AggregatedMetrics::I64(MetricData::Sum(sum)) => sum
354 .data_points()
355 .next()
356 .map(|data_point| data_point.value()),
357 _ => None,
358 })
359 }
360
361 #[test]
362 fn otel_metrics_stay_disabled_without_endpoint() {
363 let _guard = ENV_LOCK.lock().expect("env lock should not be poisoned");
364 unsafe {
365 std::env::remove_var("OTEL_EXPORTER_OTLP_ENDPOINT");
366 std::env::remove_var("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT");
367 std::env::remove_var("OTEL_METRICS_EXPORTER");
368 }
369 assert!(!otel_metrics_enabled());
370 }
371
372 #[test]
373 fn otel_metrics_enable_with_explicit_endpoint() {
374 let _guard = ENV_LOCK.lock().expect("env lock should not be poisoned");
375 unsafe {
376 std::env::set_var("OTEL_EXPORTER_OTLP_ENDPOINT", "http://collector:4317");
377 std::env::remove_var("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT");
378 std::env::remove_var("OTEL_METRICS_EXPORTER");
379 }
380 assert!(otel_metrics_enabled());
381 unsafe {
382 std::env::remove_var("OTEL_EXPORTER_OTLP_ENDPOINT");
383 }
384 }
385
386 #[tokio::test]
387 async fn health_endpoints_reflect_readiness() {
388 let (observability, _provider, _exporter) = test_observability();
389
390 assert_eq!(livez().await, "ok");
391
392 let not_ready = readyz(State(observability.clone())).await.into_response();
393 assert_eq!(not_ready.status(), StatusCode::SERVICE_UNAVAILABLE);
394
395 observability.mark_ready();
396 let ready = readyz(State(observability)).await.into_response();
397 assert_eq!(ready.status(), StatusCode::OK);
398 }
399
400 #[test]
401 fn metrics_are_recorded_and_flushed() {
402 let (observability, provider, exporter) = test_observability();
403
404 let guard: ReconcileGuard = observability.start_reconcile();
405 observability.record_lock_contention();
406 observability.record_policy_conflict();
407 observability.record_invalid_spec();
408 observability.record_database_connection_failure();
409 observability.record_plan_result("drift");
410 observability.record_planned_changes(2);
411 observability.record_apply_result("success");
412 observability.record_apply_statements(4);
413 guard.record_result("conflict", "ConflictingPolicy");
414
415 provider.force_flush().expect("flush should succeed");
416
417 let metrics = exporter
418 .get_finished_metrics()
419 .expect("metrics should be exported");
420
421 assert!(metric_exists(&metrics, "pgroles.reconcile.total"));
422 assert!(metric_exists(&metrics, "pgroles.reconcile.duration"));
423 assert_eq!(u64_sum_value(&metrics, "pgroles.plan.total"), Some(1));
424 assert_eq!(u64_sum_value(&metrics, "pgroles.plan.changes"), Some(2));
425 assert_eq!(
426 u64_sum_value(&metrics, "pgroles.lock_contention.total"),
427 Some(1)
428 );
429 assert_eq!(u64_sum_value(&metrics, "pgroles.policy.conflicts"), Some(1));
430 assert_eq!(
431 u64_sum_value(&metrics, "pgroles.invalid_spec.total"),
432 Some(1)
433 );
434 assert_eq!(
435 u64_sum_value(&metrics, "pgroles.database.connection_failures"),
436 Some(1)
437 );
438 assert_eq!(u64_sum_value(&metrics, "pgroles.apply.statements"), Some(4));
439 assert_eq!(
440 i64_sum_value(&metrics, "pgroles.reconcile.inflight"),
441 Some(0)
442 );
443 }
444}