1#[cfg(feature = "otel-metrics")]
2use std::collections::HashMap;
3#[cfg(feature = "otel-metrics")]
4use std::sync::atomic::AtomicU64;
5use std::sync::Arc;
6use std::time::Duration;
7
8use google_cloud_gax::grpc::metadata::MetadataMap;
9use thiserror::Error;
10
11#[derive(Clone, Default)]
12pub struct MetricsConfig {
13 pub enabled: bool,
15 #[cfg(feature = "otel-metrics")]
16 pub meter_provider: Option<Arc<dyn opentelemetry::metrics::MeterProvider + Send + Sync>>,
17}
18
19impl std::fmt::Debug for MetricsConfig {
20 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21 let mut ds = f.debug_struct("MetricsConfig");
22 ds.field("enabled", &self.enabled);
23 #[cfg(feature = "otel-metrics")]
24 {
25 let provider = self.meter_provider.is_some();
26 ds.field("meter_provider_present", &provider);
27 }
28 ds.finish()
29 }
30}
31
32#[derive(Clone, Default)]
33pub(crate) struct MetricsRecorder {
34 #[cfg(feature = "otel-metrics")]
35 inner: Option<Arc<OtelMetrics>>,
36}
37
38#[derive(Debug, Error)]
39pub enum MetricsError {
40 #[error("invalid database name: {0}")]
41 InvalidDatabase(String),
42}
43
44#[allow(dead_code)]
45#[derive(Clone, Debug)]
46pub(crate) struct SessionPoolSnapshot {
47 pub open_sessions: usize,
48 pub sessions_in_use: usize,
49 pub idle_sessions: usize,
50 pub max_allowed_sessions: usize,
51 pub max_in_use_last_window: usize,
52 pub has_multiplexed_session: bool,
53}
54
55pub(crate) type SessionPoolStatsFn = Arc<dyn Fn() -> SessionPoolSnapshot + Send + Sync>;
56
57impl MetricsRecorder {
58 pub fn try_new(database: &str, config: &MetricsConfig) -> Result<Self, MetricsError> {
59 #[cfg(feature = "otel-metrics")]
60 {
61 if config.enabled {
62 let parsed = parse_database_name(database)?;
63 let inner = OtelMetrics::new(parsed, config.meter_provider.clone());
64 Ok(Self {
65 inner: Some(Arc::new(inner)),
66 })
67 } else {
68 Ok(Self { inner: None })
69 }
70 }
71 #[cfg(not(feature = "otel-metrics"))]
72 {
73 let _ = database;
74 let _ = config;
75 Ok(Self::default())
76 }
77 }
78
79 pub(crate) fn register_session_pool(&self, stats: SessionPoolStatsFn) {
80 #[cfg(feature = "otel-metrics")]
81 if let Some(inner) = &self.inner {
82 inner.register_session_pool(stats);
83 }
84 #[cfg(not(feature = "otel-metrics"))]
85 {
86 let _ = stats;
87 }
88 }
89
90 pub(crate) fn record_session_timeout(&self) {
91 #[cfg(feature = "otel-metrics")]
92 if let Some(inner) = &self.inner {
93 inner.record_session_timeout();
94 }
95 }
96
97 pub(crate) fn record_session_acquired(&self) {
98 #[cfg(feature = "otel-metrics")]
99 if let Some(inner) = &self.inner {
100 inner.record_session_acquired();
101 }
102 }
103
104 pub(crate) fn record_session_released(&self) {
105 #[cfg(feature = "otel-metrics")]
106 if let Some(inner) = &self.inner {
107 inner.record_session_released();
108 }
109 }
110
111 pub(crate) fn record_session_acquire_latency(&self, duration: Duration) {
112 #[cfg(feature = "otel-metrics")]
113 if let Some(inner) = &self.inner {
114 inner.record_session_acquire_latency(duration);
115 }
116 #[cfg(not(feature = "otel-metrics"))]
117 {
118 let _ = duration;
119 }
120 }
121
122 pub(crate) fn record_server_timing(&self, method: &'static str, metadata: &MetadataMap) {
123 #[cfg(feature = "otel-metrics")]
124 if let Some(inner) = &self.inner {
125 let metrics = parse_server_timing(metadata);
126 inner.record_gfe_metrics(method, metrics);
127 }
128 #[cfg(not(feature = "otel-metrics"))]
129 {
130 let _ = method;
131 let _ = metadata;
132 }
133 }
134}
135
136#[cfg(feature = "otel-metrics")]
137mod otel_impl {
138 use super::{
139 ParsedDatabaseName, ServerTimingMetrics, SessionPoolStatsFn, ATTR_CLIENT_ID, ATTR_DATABASE, ATTR_INSTANCE,
140 ATTR_IS_MULTIPLEXED, ATTR_LIB_VERSION, ATTR_METHOD, ATTR_PROJECT, ATTR_TYPE, CLIENT_ID_SEQ, GFE_BUCKETS,
141 GFE_TIMING_HEADER, METRICS_PREFIX, OTEL_SCOPE, SESSION_ACQUIRE_BUCKETS,
142 };
143 use opentelemetry::metrics::{Counter, Histogram, Meter, MeterProvider, ObservableGauge};
144 use opentelemetry::{global, InstrumentationScope, KeyValue};
145 use std::sync::atomic::Ordering;
146 use std::sync::{Arc, Mutex};
147 use std::time::Duration;
148
149 pub(super) struct OtelMetrics {
150 meter: Meter,
151 attributes: AttributeSets,
152 session_gauges: Mutex<Option<SessionGaugeHandles>>,
153 get_session_timeouts: Counter<u64>,
154 acquired_sessions: Counter<u64>,
155 released_sessions: Counter<u64>,
156 gfe_latency: Histogram<f64>,
157 gfe_header_missing: Counter<u64>,
158 session_acquire_latency: Histogram<f64>,
159 }
160
161 struct SessionGaugeHandles {
162 _open_session_count: ObservableGauge<i64>,
163 _max_allowed_sessions: ObservableGauge<i64>,
164 _num_sessions: ObservableGauge<i64>,
165 _max_in_use_sessions: ObservableGauge<i64>,
166 }
167
168 impl OtelMetrics {
169 pub(super) fn new(
170 parsed: ParsedDatabaseName,
171 meter_provider: Option<Arc<dyn MeterProvider + Send + Sync>>,
172 ) -> Self {
173 let scope = InstrumentationScope::builder(OTEL_SCOPE)
174 .with_version(env!("CARGO_PKG_VERSION"))
175 .build();
176 let meter = if let Some(provider) = meter_provider {
177 provider.meter_with_scope(scope)
178 } else {
179 global::meter_provider().meter_with_scope(scope)
180 };
181
182 let attributes = AttributeSets::new(parsed);
183
184 let get_session_timeouts = meter
185 .u64_counter(METRICS_PREFIX.to_owned() + "get_session_timeouts")
186 .with_description("The number of get sessions timeouts due to pool exhaustion.")
187 .with_unit("1")
188 .build();
189 let acquired_sessions = meter
190 .u64_counter(METRICS_PREFIX.to_owned() + "num_acquired_sessions")
191 .with_description("The number of sessions acquired from the session pool.")
192 .with_unit("1")
193 .build();
194 let released_sessions = meter
195 .u64_counter(METRICS_PREFIX.to_owned() + "num_released_sessions")
196 .with_description("The number of sessions released by the user and pool maintainer.")
197 .with_unit("1")
198 .build();
199 let gfe_latency = meter
200 .f64_histogram(METRICS_PREFIX.to_owned() + "gfe_latency")
201 .with_description("Latency between Google's network receiving an RPC and reading back the first byte of the response.")
202 .with_unit("ms")
203 .with_boundaries(GFE_BUCKETS.to_vec())
204 .build();
205 let gfe_header_missing = meter
206 .u64_counter(METRICS_PREFIX.to_owned() + "gfe_header_missing_count")
207 .with_description("Number of RPC responses received without the server-timing header, most likely meaning the RPC never reached Google's network.")
208 .with_unit("1")
209 .build();
210 let session_acquire_latency = meter
211 .f64_histogram(METRICS_PREFIX.to_owned() + "session_acquire_latency")
212 .with_description("Time spent waiting to acquire a session from the pool.")
213 .with_unit("ms")
214 .with_boundaries(SESSION_ACQUIRE_BUCKETS.to_vec())
215 .build();
216
217 OtelMetrics {
218 meter,
219 attributes,
220 session_gauges: Mutex::new(None),
221 get_session_timeouts,
222 acquired_sessions,
223 released_sessions,
224 gfe_latency,
225 gfe_header_missing,
226 session_acquire_latency,
227 }
228 }
229
230 pub(super) fn register_session_pool(&self, stats: SessionPoolStatsFn) {
231 let mut guard = self.session_gauges.lock().unwrap();
232 if guard.is_some() {
233 return;
234 }
235
236 let open_stats = stats.clone();
237 let base = self.attributes.base.clone();
238 let multiplexed = self.attributes.with_multiplexed.clone();
239 let open_session_count = self
240 .meter
241 .i64_observable_gauge(METRICS_PREFIX.to_owned() + "open_session_count")
242 .with_description("Number of sessions currently opened.")
243 .with_unit("1")
244 .with_callback(move |observer| {
245 let snapshot = open_stats();
246 if snapshot.has_multiplexed_session {
247 observer.observe(1, multiplexed.as_ref());
248 }
249 observer.observe(snapshot.open_sessions as i64, base.as_ref());
250 })
251 .build();
252
253 let max_allowed_stats = stats.clone();
254 let base = self.attributes.base.clone();
255 let max_allowed_sessions = self
256 .meter
257 .i64_observable_gauge(METRICS_PREFIX.to_owned() + "max_allowed_sessions")
258 .with_description("The maximum number of sessions allowed. Configurable by the user.")
259 .with_unit("1")
260 .with_callback(move |observer| {
261 let snapshot = max_allowed_stats();
262 observer.observe(snapshot.max_allowed_sessions as i64, base.as_ref());
263 })
264 .build();
265
266 let sessions_stats = stats.clone();
267 let in_use_attrs = self.attributes.num_in_use.clone();
268 let idle_attrs = self.attributes.num_sessions.clone();
269 let num_sessions = self
270 .meter
271 .i64_observable_gauge(METRICS_PREFIX.to_owned() + "num_sessions_in_pool")
272 .with_description("The number of sessions currently in use.")
273 .with_unit("1")
274 .with_callback(move |observer| {
275 let snapshot = sessions_stats();
276 observer.observe(snapshot.sessions_in_use as i64, in_use_attrs.as_ref());
277 observer.observe(snapshot.idle_sessions as i64, idle_attrs.as_ref());
278 })
279 .build();
280
281 let max_in_use_stats = stats;
282 let attrs = self.attributes.max_in_use.clone();
283 let max_in_use_sessions = self
284 .meter
285 .i64_observable_gauge(METRICS_PREFIX.to_owned() + "max_in_use_sessions")
286 .with_description("The maximum number of sessions in use during the last 10 minute interval.")
287 .with_unit("1")
288 .with_callback(move |observer| {
289 let snapshot = max_in_use_stats();
290 observer.observe(snapshot.max_in_use_last_window as i64, attrs.as_ref());
291 })
292 .build();
293
294 *guard = Some(SessionGaugeHandles {
295 _open_session_count: open_session_count,
296 _max_allowed_sessions: max_allowed_sessions,
297 _num_sessions: num_sessions,
298 _max_in_use_sessions: max_in_use_sessions,
299 });
300 }
301
302 pub(super) fn record_session_timeout(&self) {
303 self.get_session_timeouts
304 .add(1, self.attributes.without_multiplexed.as_ref());
305 }
306
307 pub(super) fn record_session_acquired(&self) {
308 self.acquired_sessions
309 .add(1, self.attributes.without_multiplexed.as_ref());
310 }
311
312 pub(super) fn record_session_released(&self) {
313 self.released_sessions
314 .add(1, self.attributes.without_multiplexed.as_ref());
315 }
316
317 pub(super) fn record_session_acquire_latency(&self, duration: Duration) {
318 let latency_ms = duration.as_secs_f64() * 1000.0;
319 self.session_acquire_latency
320 .record(latency_ms, self.attributes.base.as_ref());
321 }
322
323 pub(super) fn record_gfe_metrics(&self, method: &'static str, metrics: ServerTimingMetrics) {
324 if metrics.is_empty() {
325 self.gfe_header_missing.add(1, self.attributes.base.as_ref());
326 return;
327 }
328
329 let mut attrs: Vec<KeyValue> = self.attributes.base.as_ref().to_vec();
330 attrs.push(KeyValue::new(ATTR_METHOD, method));
331
332 let latency = metrics.value(GFE_TIMING_HEADER);
333 self.gfe_latency.record(latency, &attrs);
334 }
335 }
336
337 #[derive(Clone)]
338 struct AttributeSets {
339 base: Arc<[KeyValue]>,
340 with_multiplexed: Arc<[KeyValue]>,
341 without_multiplexed: Arc<[KeyValue]>,
342 num_in_use: Arc<[KeyValue]>,
343 num_sessions: Arc<[KeyValue]>,
344 max_in_use: Arc<[KeyValue]>,
345 }
346
347 impl AttributeSets {
348 fn new(parsed: ParsedDatabaseName) -> Self {
349 let client_id = next_client_id();
350 let base_vec = vec![
351 KeyValue::new(ATTR_CLIENT_ID, client_id),
352 KeyValue::new(ATTR_DATABASE, parsed.database),
353 KeyValue::new(ATTR_INSTANCE, parsed.instance),
354 KeyValue::new(ATTR_PROJECT, parsed.project),
355 KeyValue::new(ATTR_LIB_VERSION, env!("CARGO_PKG_VERSION")),
356 ];
357
358 let mut with_multiplexed_vec = base_vec.clone();
359 with_multiplexed_vec.push(KeyValue::new(ATTR_IS_MULTIPLEXED, "true"));
360
361 let mut without_multiplexed_vec = base_vec.clone();
362 without_multiplexed_vec.push(KeyValue::new(ATTR_IS_MULTIPLEXED, "false"));
363
364 let mut num_in_use_vec = without_multiplexed_vec.clone();
365 num_in_use_vec.push(KeyValue::new(ATTR_TYPE, "num_in_use_sessions"));
366
367 let mut num_sessions_vec = without_multiplexed_vec.clone();
368 num_sessions_vec.push(KeyValue::new(ATTR_TYPE, "num_sessions"));
369
370 let max_in_use_vec = without_multiplexed_vec.clone();
371
372 AttributeSets {
373 base: base_vec.into(),
374 with_multiplexed: with_multiplexed_vec.into(),
375 without_multiplexed: without_multiplexed_vec.into(),
376 num_in_use: num_in_use_vec.into(),
377 num_sessions: num_sessions_vec.into(),
378 max_in_use: max_in_use_vec.into(),
379 }
380 }
381 }
382
383 fn next_client_id() -> String {
384 let id = CLIENT_ID_SEQ.fetch_add(1, Ordering::Relaxed);
385 format!("rust-client-{id}")
386 }
387}
388
389#[cfg(feature = "otel-metrics")]
390use otel_impl::*;
391
392#[cfg(feature = "otel-metrics")]
393const OTEL_SCOPE: &str = "cloud.google.com/go";
394#[cfg(feature = "otel-metrics")]
395const METRICS_PREFIX: &str = "spanner/";
396#[cfg(feature = "otel-metrics")]
397const ATTR_CLIENT_ID: &str = "client_id";
398#[cfg(feature = "otel-metrics")]
399const ATTR_DATABASE: &str = "database";
400#[cfg(feature = "otel-metrics")]
401const ATTR_INSTANCE: &str = "instance_id";
402#[cfg(feature = "otel-metrics")]
403const ATTR_PROJECT: &str = "project_id";
404#[cfg(feature = "otel-metrics")]
405const ATTR_LIB_VERSION: &str = "library_version";
406#[cfg(feature = "otel-metrics")]
407const ATTR_IS_MULTIPLEXED: &str = "is_multiplexed";
408#[cfg(feature = "otel-metrics")]
409const ATTR_TYPE: &str = "type";
410#[cfg(feature = "otel-metrics")]
411const ATTR_METHOD: &str = "grpc_client_method";
412#[cfg(feature = "otel-metrics")]
413const GFE_TIMING_HEADER: &str = "gfet4t7";
414#[cfg(feature = "otel-metrics")]
415const SERVER_TIMING_HEADER: &str = "server-timing";
416
417#[cfg(feature = "otel-metrics")]
418const SESSION_ACQUIRE_BUCKETS: &[f64] = &[
419 0.0, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 75.0, 100.0, 150.0, 200.0, 300.0, 400.0, 500.0, 750.0, 1000.0, 1500.0,
420 2000.0, 3000.0, 4000.0, 5000.0, 7500.0, 10000.0, 15000.0, 30000.0, 60000.0,
421];
422
423#[cfg(feature = "otel-metrics")]
424const GFE_BUCKETS: &[f64] = &[
425 0.0, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0,
426 20.0, 25.0, 30.0, 40.0, 50.0, 65.0, 80.0, 100.0, 130.0, 160.0, 200.0, 250.0, 300.0, 400.0, 500.0, 650.0, 800.0,
427 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0, 100000.0, 200000.0, 400000.0, 800000.0, 1600000.0, 3200000.0,
428];
429
430#[cfg(feature = "otel-metrics")]
431static CLIENT_ID_SEQ: AtomicU64 = AtomicU64::new(1);
432
433#[cfg(feature = "otel-metrics")]
434#[derive(Clone)]
435struct ParsedDatabaseName {
436 database: String,
437 instance: String,
438 project: String,
439}
440
441#[cfg(feature = "otel-metrics")]
442fn parse_database_name(name: &str) -> Result<ParsedDatabaseName, MetricsError> {
443 let parts: Vec<&str> = name.split('/').collect();
444 if parts.len() != 6 || parts[0] != "projects" || parts[2] != "instances" || parts[4] != "databases" {
445 return Err(MetricsError::InvalidDatabase(name.to_string()));
446 }
447 Ok(ParsedDatabaseName {
448 project: parts[1].to_string(),
449 instance: parts[3].to_string(),
450 database: parts[5].to_string(),
451 })
452}
453
454#[cfg(feature = "otel-metrics")]
455#[derive(Clone)]
456struct ServerTimingMetrics {
457 values: HashMap<String, f64>,
458}
459
460#[cfg(feature = "otel-metrics")]
461impl ServerTimingMetrics {
462 fn is_empty(&self) -> bool {
463 self.values.is_empty()
464 }
465
466 fn value(&self, key: &str) -> f64 {
467 self.values.get(key).copied().unwrap_or_default()
468 }
469}
470
471#[cfg(feature = "otel-metrics")]
472fn parse_server_timing(metadata: &MetadataMap) -> ServerTimingMetrics {
473 let mut map = HashMap::new();
474 for value in metadata.get_all(SERVER_TIMING_HEADER).iter() {
475 if let Ok(raw) = value.to_str() {
476 for part in raw.split(',') {
477 let trimmed = part.trim();
478 if let Some((name, dur_part)) = trimmed.split_once(';') {
479 let name = name.trim();
480 if let Some(duration) = dur_part.trim().strip_prefix("dur=") {
481 if let Ok(parsed) = duration.trim().parse::<f64>() {
482 map.insert(name.to_string(), parsed);
483 }
484 }
485 }
486 }
487 }
488 }
489 ServerTimingMetrics { values: map }
490}
491
492#[cfg(all(test, feature = "otel-metrics"))]
493mod tests {
494 use super::*;
495
496 #[test]
497 fn parses_server_timing_header() {
498 let mut metadata = MetadataMap::new();
499 metadata.insert(SERVER_TIMING_HEADER, "gfet4t7;dur=12.5,another-metric;dur=3.5".parse().unwrap());
500 let metrics = parse_server_timing(&metadata);
501 assert!(!metrics.is_empty());
502 assert!((metrics.value(GFE_TIMING_HEADER) - 12.5).abs() < f64::EPSILON);
503 }
504}