gcloud_spanner/
metrics.rs

1#[cfg(feature = "otel-metrics")]
2use std::collections::HashMap;
3#[cfg(feature = "otel-metrics")]
4use std::sync::atomic::AtomicU64;
5use std::sync::Arc;
6use std::time::Duration;
7
8use google_cloud_gax::grpc::metadata::MetadataMap;
9use thiserror::Error;
10
11#[derive(Clone, Default)]
12pub struct MetricsConfig {
13    /// Enables OpenTelemetry metrics emission when the `otel-metrics` feature is active.
14    pub enabled: bool,
15    #[cfg(feature = "otel-metrics")]
16    pub meter_provider: Option<Arc<dyn opentelemetry::metrics::MeterProvider + Send + Sync>>,
17}
18
19impl std::fmt::Debug for MetricsConfig {
20    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21        let mut ds = f.debug_struct("MetricsConfig");
22        ds.field("enabled", &self.enabled);
23        #[cfg(feature = "otel-metrics")]
24        {
25            let provider = self.meter_provider.is_some();
26            ds.field("meter_provider_present", &provider);
27        }
28        ds.finish()
29    }
30}
31
32#[derive(Clone, Default)]
33pub(crate) struct MetricsRecorder {
34    #[cfg(feature = "otel-metrics")]
35    inner: Option<Arc<OtelMetrics>>,
36}
37
38#[derive(Debug, Error)]
39pub enum MetricsError {
40    #[error("invalid database name: {0}")]
41    InvalidDatabase(String),
42}
43
44#[allow(dead_code)]
45#[derive(Clone, Debug)]
46pub(crate) struct SessionPoolSnapshot {
47    pub open_sessions: usize,
48    pub sessions_in_use: usize,
49    pub idle_sessions: usize,
50    pub max_allowed_sessions: usize,
51    pub max_in_use_last_window: usize,
52    pub has_multiplexed_session: bool,
53}
54
55pub(crate) type SessionPoolStatsFn = Arc<dyn Fn() -> SessionPoolSnapshot + Send + Sync>;
56
57impl MetricsRecorder {
58    pub fn try_new(database: &str, config: &MetricsConfig) -> Result<Self, MetricsError> {
59        #[cfg(feature = "otel-metrics")]
60        {
61            if config.enabled {
62                let parsed = parse_database_name(database)?;
63                let inner = OtelMetrics::new(parsed, config.meter_provider.clone());
64                Ok(Self {
65                    inner: Some(Arc::new(inner)),
66                })
67            } else {
68                Ok(Self { inner: None })
69            }
70        }
71        #[cfg(not(feature = "otel-metrics"))]
72        {
73            let _ = database;
74            let _ = config;
75            Ok(Self::default())
76        }
77    }
78
79    pub(crate) fn register_session_pool(&self, stats: SessionPoolStatsFn) {
80        #[cfg(feature = "otel-metrics")]
81        if let Some(inner) = &self.inner {
82            inner.register_session_pool(stats);
83        }
84        #[cfg(not(feature = "otel-metrics"))]
85        {
86            let _ = stats;
87        }
88    }
89
90    pub(crate) fn record_session_timeout(&self) {
91        #[cfg(feature = "otel-metrics")]
92        if let Some(inner) = &self.inner {
93            inner.record_session_timeout();
94        }
95    }
96
97    pub(crate) fn record_session_acquired(&self) {
98        #[cfg(feature = "otel-metrics")]
99        if let Some(inner) = &self.inner {
100            inner.record_session_acquired();
101        }
102    }
103
104    pub(crate) fn record_session_released(&self) {
105        #[cfg(feature = "otel-metrics")]
106        if let Some(inner) = &self.inner {
107            inner.record_session_released();
108        }
109    }
110
111    pub(crate) fn record_session_acquire_latency(&self, duration: Duration) {
112        #[cfg(feature = "otel-metrics")]
113        if let Some(inner) = &self.inner {
114            inner.record_session_acquire_latency(duration);
115        }
116        #[cfg(not(feature = "otel-metrics"))]
117        {
118            let _ = duration;
119        }
120    }
121
122    pub(crate) fn record_server_timing(&self, method: &'static str, metadata: &MetadataMap) {
123        #[cfg(feature = "otel-metrics")]
124        if let Some(inner) = &self.inner {
125            let metrics = parse_server_timing(metadata);
126            inner.record_gfe_metrics(method, metrics);
127        }
128        #[cfg(not(feature = "otel-metrics"))]
129        {
130            let _ = method;
131            let _ = metadata;
132        }
133    }
134}
135
136#[cfg(feature = "otel-metrics")]
137mod otel_impl {
138    use super::{
139        ParsedDatabaseName, ServerTimingMetrics, SessionPoolStatsFn, ATTR_CLIENT_ID, ATTR_DATABASE, ATTR_INSTANCE,
140        ATTR_IS_MULTIPLEXED, ATTR_LIB_VERSION, ATTR_METHOD, ATTR_PROJECT, ATTR_TYPE, CLIENT_ID_SEQ, GFE_BUCKETS,
141        GFE_TIMING_HEADER, METRICS_PREFIX, OTEL_SCOPE, SESSION_ACQUIRE_BUCKETS,
142    };
143    use opentelemetry::metrics::{Counter, Histogram, Meter, MeterProvider, ObservableGauge};
144    use opentelemetry::{global, InstrumentationScope, KeyValue};
145    use std::sync::atomic::Ordering;
146    use std::sync::{Arc, Mutex};
147    use std::time::Duration;
148
149    pub(super) struct OtelMetrics {
150        meter: Meter,
151        attributes: AttributeSets,
152        session_gauges: Mutex<Option<SessionGaugeHandles>>,
153        get_session_timeouts: Counter<u64>,
154        acquired_sessions: Counter<u64>,
155        released_sessions: Counter<u64>,
156        gfe_latency: Histogram<f64>,
157        gfe_header_missing: Counter<u64>,
158        session_acquire_latency: Histogram<f64>,
159    }
160
161    struct SessionGaugeHandles {
162        _open_session_count: ObservableGauge<i64>,
163        _max_allowed_sessions: ObservableGauge<i64>,
164        _num_sessions: ObservableGauge<i64>,
165        _max_in_use_sessions: ObservableGauge<i64>,
166    }
167
168    impl OtelMetrics {
169        pub(super) fn new(
170            parsed: ParsedDatabaseName,
171            meter_provider: Option<Arc<dyn MeterProvider + Send + Sync>>,
172        ) -> Self {
173            let scope = InstrumentationScope::builder(OTEL_SCOPE)
174                .with_version(env!("CARGO_PKG_VERSION"))
175                .build();
176            let meter = if let Some(provider) = meter_provider {
177                provider.meter_with_scope(scope)
178            } else {
179                global::meter_provider().meter_with_scope(scope)
180            };
181
182            let attributes = AttributeSets::new(parsed);
183
184            let get_session_timeouts = meter
185                .u64_counter(METRICS_PREFIX.to_owned() + "get_session_timeouts")
186                .with_description("The number of get sessions timeouts due to pool exhaustion.")
187                .with_unit("1")
188                .build();
189            let acquired_sessions = meter
190                .u64_counter(METRICS_PREFIX.to_owned() + "num_acquired_sessions")
191                .with_description("The number of sessions acquired from the session pool.")
192                .with_unit("1")
193                .build();
194            let released_sessions = meter
195                .u64_counter(METRICS_PREFIX.to_owned() + "num_released_sessions")
196                .with_description("The number of sessions released by the user and pool maintainer.")
197                .with_unit("1")
198                .build();
199            let gfe_latency = meter
200                .f64_histogram(METRICS_PREFIX.to_owned() + "gfe_latency")
201                .with_description("Latency between Google's network receiving an RPC and reading back the first byte of the response.")
202                .with_unit("ms")
203                .with_boundaries(GFE_BUCKETS.to_vec())
204                .build();
205            let gfe_header_missing = meter
206                .u64_counter(METRICS_PREFIX.to_owned() + "gfe_header_missing_count")
207                .with_description("Number of RPC responses received without the server-timing header, most likely meaning the RPC never reached Google's network.")
208                .with_unit("1")
209                .build();
210            let session_acquire_latency = meter
211                .f64_histogram(METRICS_PREFIX.to_owned() + "session_acquire_latency")
212                .with_description("Time spent waiting to acquire a session from the pool.")
213                .with_unit("ms")
214                .with_boundaries(SESSION_ACQUIRE_BUCKETS.to_vec())
215                .build();
216
217            OtelMetrics {
218                meter,
219                attributes,
220                session_gauges: Mutex::new(None),
221                get_session_timeouts,
222                acquired_sessions,
223                released_sessions,
224                gfe_latency,
225                gfe_header_missing,
226                session_acquire_latency,
227            }
228        }
229
230        pub(super) fn register_session_pool(&self, stats: SessionPoolStatsFn) {
231            let mut guard = self.session_gauges.lock().unwrap();
232            if guard.is_some() {
233                return;
234            }
235
236            let open_stats = stats.clone();
237            let base = self.attributes.base.clone();
238            let multiplexed = self.attributes.with_multiplexed.clone();
239            let open_session_count = self
240                .meter
241                .i64_observable_gauge(METRICS_PREFIX.to_owned() + "open_session_count")
242                .with_description("Number of sessions currently opened.")
243                .with_unit("1")
244                .with_callback(move |observer| {
245                    let snapshot = open_stats();
246                    if snapshot.has_multiplexed_session {
247                        observer.observe(1, multiplexed.as_ref());
248                    }
249                    observer.observe(snapshot.open_sessions as i64, base.as_ref());
250                })
251                .build();
252
253            let max_allowed_stats = stats.clone();
254            let base = self.attributes.base.clone();
255            let max_allowed_sessions = self
256                .meter
257                .i64_observable_gauge(METRICS_PREFIX.to_owned() + "max_allowed_sessions")
258                .with_description("The maximum number of sessions allowed. Configurable by the user.")
259                .with_unit("1")
260                .with_callback(move |observer| {
261                    let snapshot = max_allowed_stats();
262                    observer.observe(snapshot.max_allowed_sessions as i64, base.as_ref());
263                })
264                .build();
265
266            let sessions_stats = stats.clone();
267            let in_use_attrs = self.attributes.num_in_use.clone();
268            let idle_attrs = self.attributes.num_sessions.clone();
269            let num_sessions = self
270                .meter
271                .i64_observable_gauge(METRICS_PREFIX.to_owned() + "num_sessions_in_pool")
272                .with_description("The number of sessions currently in use.")
273                .with_unit("1")
274                .with_callback(move |observer| {
275                    let snapshot = sessions_stats();
276                    observer.observe(snapshot.sessions_in_use as i64, in_use_attrs.as_ref());
277                    observer.observe(snapshot.idle_sessions as i64, idle_attrs.as_ref());
278                })
279                .build();
280
281            let max_in_use_stats = stats;
282            let attrs = self.attributes.max_in_use.clone();
283            let max_in_use_sessions = self
284                .meter
285                .i64_observable_gauge(METRICS_PREFIX.to_owned() + "max_in_use_sessions")
286                .with_description("The maximum number of sessions in use during the last 10 minute interval.")
287                .with_unit("1")
288                .with_callback(move |observer| {
289                    let snapshot = max_in_use_stats();
290                    observer.observe(snapshot.max_in_use_last_window as i64, attrs.as_ref());
291                })
292                .build();
293
294            *guard = Some(SessionGaugeHandles {
295                _open_session_count: open_session_count,
296                _max_allowed_sessions: max_allowed_sessions,
297                _num_sessions: num_sessions,
298                _max_in_use_sessions: max_in_use_sessions,
299            });
300        }
301
302        pub(super) fn record_session_timeout(&self) {
303            self.get_session_timeouts
304                .add(1, self.attributes.without_multiplexed.as_ref());
305        }
306
307        pub(super) fn record_session_acquired(&self) {
308            self.acquired_sessions
309                .add(1, self.attributes.without_multiplexed.as_ref());
310        }
311
312        pub(super) fn record_session_released(&self) {
313            self.released_sessions
314                .add(1, self.attributes.without_multiplexed.as_ref());
315        }
316
317        pub(super) fn record_session_acquire_latency(&self, duration: Duration) {
318            let latency_ms = duration.as_secs_f64() * 1000.0;
319            self.session_acquire_latency
320                .record(latency_ms, self.attributes.base.as_ref());
321        }
322
323        pub(super) fn record_gfe_metrics(&self, method: &'static str, metrics: ServerTimingMetrics) {
324            if metrics.is_empty() {
325                self.gfe_header_missing.add(1, self.attributes.base.as_ref());
326                return;
327            }
328
329            let mut attrs: Vec<KeyValue> = self.attributes.base.as_ref().to_vec();
330            attrs.push(KeyValue::new(ATTR_METHOD, method));
331
332            let latency = metrics.value(GFE_TIMING_HEADER);
333            self.gfe_latency.record(latency, &attrs);
334        }
335    }
336
337    #[derive(Clone)]
338    struct AttributeSets {
339        base: Arc<[KeyValue]>,
340        with_multiplexed: Arc<[KeyValue]>,
341        without_multiplexed: Arc<[KeyValue]>,
342        num_in_use: Arc<[KeyValue]>,
343        num_sessions: Arc<[KeyValue]>,
344        max_in_use: Arc<[KeyValue]>,
345    }
346
347    impl AttributeSets {
348        fn new(parsed: ParsedDatabaseName) -> Self {
349            let client_id = next_client_id();
350            let base_vec = vec![
351                KeyValue::new(ATTR_CLIENT_ID, client_id),
352                KeyValue::new(ATTR_DATABASE, parsed.database),
353                KeyValue::new(ATTR_INSTANCE, parsed.instance),
354                KeyValue::new(ATTR_PROJECT, parsed.project),
355                KeyValue::new(ATTR_LIB_VERSION, env!("CARGO_PKG_VERSION")),
356            ];
357
358            let mut with_multiplexed_vec = base_vec.clone();
359            with_multiplexed_vec.push(KeyValue::new(ATTR_IS_MULTIPLEXED, "true"));
360
361            let mut without_multiplexed_vec = base_vec.clone();
362            without_multiplexed_vec.push(KeyValue::new(ATTR_IS_MULTIPLEXED, "false"));
363
364            let mut num_in_use_vec = without_multiplexed_vec.clone();
365            num_in_use_vec.push(KeyValue::new(ATTR_TYPE, "num_in_use_sessions"));
366
367            let mut num_sessions_vec = without_multiplexed_vec.clone();
368            num_sessions_vec.push(KeyValue::new(ATTR_TYPE, "num_sessions"));
369
370            let max_in_use_vec = without_multiplexed_vec.clone();
371
372            AttributeSets {
373                base: base_vec.into(),
374                with_multiplexed: with_multiplexed_vec.into(),
375                without_multiplexed: without_multiplexed_vec.into(),
376                num_in_use: num_in_use_vec.into(),
377                num_sessions: num_sessions_vec.into(),
378                max_in_use: max_in_use_vec.into(),
379            }
380        }
381    }
382
383    fn next_client_id() -> String {
384        let id = CLIENT_ID_SEQ.fetch_add(1, Ordering::Relaxed);
385        format!("rust-client-{id}")
386    }
387}
388
389#[cfg(feature = "otel-metrics")]
390use otel_impl::*;
391
392#[cfg(feature = "otel-metrics")]
393const OTEL_SCOPE: &str = "cloud.google.com/go";
394#[cfg(feature = "otel-metrics")]
395const METRICS_PREFIX: &str = "spanner/";
396#[cfg(feature = "otel-metrics")]
397const ATTR_CLIENT_ID: &str = "client_id";
398#[cfg(feature = "otel-metrics")]
399const ATTR_DATABASE: &str = "database";
400#[cfg(feature = "otel-metrics")]
401const ATTR_INSTANCE: &str = "instance_id";
402#[cfg(feature = "otel-metrics")]
403const ATTR_PROJECT: &str = "project_id";
404#[cfg(feature = "otel-metrics")]
405const ATTR_LIB_VERSION: &str = "library_version";
406#[cfg(feature = "otel-metrics")]
407const ATTR_IS_MULTIPLEXED: &str = "is_multiplexed";
408#[cfg(feature = "otel-metrics")]
409const ATTR_TYPE: &str = "type";
410#[cfg(feature = "otel-metrics")]
411const ATTR_METHOD: &str = "grpc_client_method";
412#[cfg(feature = "otel-metrics")]
413const GFE_TIMING_HEADER: &str = "gfet4t7";
414#[cfg(feature = "otel-metrics")]
415const SERVER_TIMING_HEADER: &str = "server-timing";
416
417#[cfg(feature = "otel-metrics")]
418const SESSION_ACQUIRE_BUCKETS: &[f64] = &[
419    0.0, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 75.0, 100.0, 150.0, 200.0, 300.0, 400.0, 500.0, 750.0, 1000.0, 1500.0,
420    2000.0, 3000.0, 4000.0, 5000.0, 7500.0, 10000.0, 15000.0, 30000.0, 60000.0,
421];
422
423#[cfg(feature = "otel-metrics")]
424const GFE_BUCKETS: &[f64] = &[
425    0.0, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0,
426    20.0, 25.0, 30.0, 40.0, 50.0, 65.0, 80.0, 100.0, 130.0, 160.0, 200.0, 250.0, 300.0, 400.0, 500.0, 650.0, 800.0,
427    1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0, 100000.0, 200000.0, 400000.0, 800000.0, 1600000.0, 3200000.0,
428];
429
430#[cfg(feature = "otel-metrics")]
431static CLIENT_ID_SEQ: AtomicU64 = AtomicU64::new(1);
432
433#[cfg(feature = "otel-metrics")]
434#[derive(Clone)]
435struct ParsedDatabaseName {
436    database: String,
437    instance: String,
438    project: String,
439}
440
441#[cfg(feature = "otel-metrics")]
442fn parse_database_name(name: &str) -> Result<ParsedDatabaseName, MetricsError> {
443    let parts: Vec<&str> = name.split('/').collect();
444    if parts.len() != 6 || parts[0] != "projects" || parts[2] != "instances" || parts[4] != "databases" {
445        return Err(MetricsError::InvalidDatabase(name.to_string()));
446    }
447    Ok(ParsedDatabaseName {
448        project: parts[1].to_string(),
449        instance: parts[3].to_string(),
450        database: parts[5].to_string(),
451    })
452}
453
454#[cfg(feature = "otel-metrics")]
455#[derive(Clone)]
456struct ServerTimingMetrics {
457    values: HashMap<String, f64>,
458}
459
460#[cfg(feature = "otel-metrics")]
461impl ServerTimingMetrics {
462    fn is_empty(&self) -> bool {
463        self.values.is_empty()
464    }
465
466    fn value(&self, key: &str) -> f64 {
467        self.values.get(key).copied().unwrap_or_default()
468    }
469}
470
471#[cfg(feature = "otel-metrics")]
472fn parse_server_timing(metadata: &MetadataMap) -> ServerTimingMetrics {
473    let mut map = HashMap::new();
474    for value in metadata.get_all(SERVER_TIMING_HEADER).iter() {
475        if let Ok(raw) = value.to_str() {
476            for part in raw.split(',') {
477                let trimmed = part.trim();
478                if let Some((name, dur_part)) = trimmed.split_once(';') {
479                    let name = name.trim();
480                    if let Some(duration) = dur_part.trim().strip_prefix("dur=") {
481                        if let Ok(parsed) = duration.trim().parse::<f64>() {
482                            map.insert(name.to_string(), parsed);
483                        }
484                    }
485                }
486            }
487        }
488    }
489    ServerTimingMetrics { values: map }
490}
491
492#[cfg(all(test, feature = "otel-metrics"))]
493mod tests {
494    use super::*;
495
496    #[test]
497    fn parses_server_timing_header() {
498        let mut metadata = MetadataMap::new();
499        metadata.insert(SERVER_TIMING_HEADER, "gfet4t7;dur=12.5,another-metric;dur=3.5".parse().unwrap());
500        let metrics = parse_server_timing(&metadata);
501        assert!(!metrics.is_empty());
502        assert!((metrics.value(GFE_TIMING_HEADER) - 12.5).abs() < f64::EPSILON);
503    }
504}