sqlx_otel/pool.rs
1use std::sync::Arc;
2use std::time::Duration;
3
4use opentelemetry_semantic_conventions::metric as semconv_metric;
5
6use crate::annotations::{Annotated, QueryAnnotations};
7use crate::attributes::{ConnectionAttributes, QueryTextMode};
8use crate::connection::PoolConnection;
9use crate::database::Database;
10use crate::metrics::Metrics;
11use crate::transaction::Transaction;
12
13/// Shared state propagated to every wrapper type derived from a pool.
14#[derive(Debug, Clone)]
15pub(crate) struct SharedState {
16 pub attrs: Arc<ConnectionAttributes>,
17 pub metrics: Arc<Metrics>,
18}
19
20/// Builder for constructing an instrumented [`Pool`] from a raw `sqlx::Pool`.
21///
22/// The builder auto-extracts connection attributes (host, port, database namespace) from
23/// the underlying connect options via the [`Database`] trait, then lets you override any of
24/// them before calling [`build`](Self::build). Settings on the wrapped `sqlx::Pool` itself
25/// (max connections, idle timeout, etc.) should be applied to the `sqlx::Pool` *before*
26/// passing it to the builder – `sqlx-otel` does not duplicate `SQLx`'s configuration
27/// surface.
28///
29/// # Example
30///
31/// ```no_run
32/// # #[cfg(feature = "sqlite")]
33/// # async fn _doc() -> Result<(), sqlx::Error> {
34/// use sqlx_otel::{PoolBuilder, QueryTextMode};
35/// use std::time::Duration;
36///
37/// let raw = sqlx::SqlitePool::connect(":memory:").await?;
38/// let pool = PoolBuilder::from(raw)
39/// .with_database("my_db")
40/// .with_query_text_mode(QueryTextMode::Obfuscated)
41/// .with_pool_name("my-service-db")
42/// .with_pool_metrics_interval(Duration::from_secs(5))
43/// .build();
44/// # let _ = pool;
45/// # Ok(())
46/// # }
47/// ```
48#[derive(Debug)]
49pub struct PoolBuilder<DB: sqlx::Database> {
50 pool: sqlx::Pool<DB>,
51 host: Option<String>,
52 port: Option<u16>,
53 namespace: Option<String>,
54 network_peer_address: Option<String>,
55 network_peer_port: Option<u16>,
56 query_text_mode: QueryTextMode,
57 pool_name: Option<String>,
58 pool_metrics_interval: Duration,
59}
60
61impl<DB: Database> From<sqlx::Pool<DB>> for PoolBuilder<DB> {
62 /// Create a builder from an existing `sqlx::Pool`, auto-extracting connection
63 /// attributes from the backend's connect options.
64 fn from(pool: sqlx::Pool<DB>) -> Self {
65 let (host, port, namespace) = DB::connection_attributes(&pool);
66 Self {
67 pool,
68 host,
69 port,
70 namespace,
71 network_peer_address: None,
72 network_peer_port: None,
73 query_text_mode: QueryTextMode::default(),
74 pool_name: None,
75 pool_metrics_interval: Duration::from_secs(10),
76 }
77 }
78}
79
80impl<DB: Database> PoolBuilder<DB> {
81 /// Override the `db.namespace` attribute (the database name).
82 #[must_use]
83 pub fn with_database(mut self, database: impl Into<String>) -> Self {
84 self.namespace = Some(database.into());
85 self
86 }
87
88 /// Override the `server.address` attribute (the logical hostname).
89 #[must_use]
90 pub fn with_host(mut self, host: impl Into<String>) -> Self {
91 self.host = Some(host.into());
92 self
93 }
94
95 /// Override the `server.port` attribute.
96 #[must_use]
97 pub fn with_port(mut self, port: u16) -> Self {
98 self.port = Some(port);
99 self
100 }
101
102 /// Set the `network.peer.address` attribute (the resolved IP address).
103 #[must_use]
104 pub fn with_network_peer_address(mut self, address: impl Into<String>) -> Self {
105 self.network_peer_address = Some(address.into());
106 self
107 }
108
109 /// Set the `network.peer.port` attribute (the resolved port).
110 #[must_use]
111 pub fn with_network_peer_port(mut self, port: u16) -> Self {
112 self.network_peer_port = Some(port);
113 self
114 }
115
116 /// Configure how `db.query.text` is captured on spans. Defaults to
117 /// [`QueryTextMode::Full`].
118 #[must_use]
119 pub fn with_query_text_mode(mut self, mode: QueryTextMode) -> Self {
120 self.query_text_mode = mode;
121 self
122 }
123
124 /// Set the `db.client.connection.pool.name` attribute and enable the
125 /// `db.client.connection.count` polling task.
126 ///
127 /// When a runtime feature (`runtime-tokio` or `runtime-async-std`) is also enabled, a
128 /// background task is spawned that periodically records `db.client.connection.count`
129 /// (idle / used). See [`with_pool_metrics_interval`](Self::with_pool_metrics_interval)
130 /// to configure the polling frequency. The task is cancelled when the [`Pool`] (and
131 /// every clone of it) is dropped.
132 ///
133 /// **Without a runtime feature, the name is recorded but no `connection.count` task is
134 /// spawned and the gauge is never reported.** All other operation- and pool-level
135 /// metrics still work in that configuration.
136 #[must_use]
137 pub fn with_pool_name(mut self, name: impl Into<String>) -> Self {
138 self.pool_name = Some(name.into());
139 self
140 }
141
142 /// Set the polling interval for `db.client.connection.count`. Defaults to 10 seconds.
143 ///
144 /// Has no effect unless [`with_pool_name`](Self::with_pool_name) is also called and a
145 /// runtime feature is enabled.
146 #[must_use]
147 pub fn with_pool_metrics_interval(mut self, interval: Duration) -> Self {
148 self.pool_metrics_interval = interval;
149 self
150 }
151
152 /// Consume the builder and produce an instrumented [`Pool`].
153 ///
154 /// At this point the static pool gauges (`db.client.connection.max`,
155 /// `db.client.connection.idle.max`, `db.client.connection.idle.min`) are recorded
156 /// once with the connection-level attributes – they do not change over the pool's
157 /// lifetime. The wait-time / use-time / timeout / pending-request instruments are
158 /// created here and updated inline on every `acquire()` and connection drop.
159 #[must_use]
160 pub fn build(self) -> Pool<DB> {
161 let metrics_shutdown = self.spawn_pool_metrics_task();
162
163 let attrs = Arc::new(ConnectionAttributes {
164 system: DB::SYSTEM,
165 host: self.host,
166 port: self.port,
167 namespace: self.namespace,
168 network_peer_address: self.network_peer_address,
169 network_peer_port: self.network_peer_port,
170 query_text_mode: self.query_text_mode,
171 });
172 let metrics = Arc::new(Metrics::new());
173 let meter = opentelemetry::global::meter("sqlx-otel");
174
175 // Record static pool configuration gauges once – these never change.
176 let max_conns = i64::from(self.pool.options().get_max_connections());
177 let min_conns = i64::from(self.pool.options().get_min_connections());
178 let base_attrs = attrs.base_key_values();
179
180 meter
181 .i64_gauge(semconv_metric::DB_CLIENT_CONNECTION_MAX)
182 .with_description("The maximum number of open connections allowed.")
183 .build()
184 .record(max_conns, &base_attrs);
185 meter
186 .i64_gauge(semconv_metric::DB_CLIENT_CONNECTION_IDLE_MAX)
187 .with_description("The maximum number of idle open connections allowed.")
188 .build()
189 .record(max_conns, &base_attrs);
190 meter
191 .i64_gauge(semconv_metric::DB_CLIENT_CONNECTION_IDLE_MIN)
192 .with_description("The minimum number of idle open connections allowed.")
193 .build()
194 .record(min_conns, &base_attrs);
195
196 Pool {
197 inner: self.pool,
198 state: SharedState { attrs, metrics },
199 metrics_shutdown,
200 wait_time: Arc::new(
201 meter
202 .f64_histogram(semconv_metric::DB_CLIENT_CONNECTION_WAIT_TIME)
203 .with_unit("s")
204 .with_description(
205 "The time it took to obtain an open connection from the pool.",
206 )
207 .build(),
208 ),
209 use_time: Arc::new(
210 meter
211 .f64_histogram(semconv_metric::DB_CLIENT_CONNECTION_USE_TIME)
212 .with_unit("s")
213 .with_description(
214 "The time between borrowing a connection and returning it to the pool.",
215 )
216 .build(),
217 ),
218 timeouts: Arc::new(
219 meter
220 .u64_counter(semconv_metric::DB_CLIENT_CONNECTION_TIMEOUTS)
221 .with_description(
222 "The number of connection pool acquire attempts that timed out.",
223 )
224 .build(),
225 ),
226 pending_requests: Arc::new(
227 meter
228 .i64_up_down_counter(semconv_metric::DB_CLIENT_CONNECTION_PENDING_REQUESTS)
229 .with_description("The number of pending requests for an open connection.")
230 .build(),
231 ),
232 }
233 }
234
235 /// Spawn the pool metrics background task if a pool name is set and a runtime is
236 /// available. Returns the shutdown handle (or `None`).
237 fn spawn_pool_metrics_task(&self) -> Option<crate::pool_metrics::ShutdownHandle> {
238 let name = self.pool_name.as_ref()?;
239
240 // Prefer tokio if both runtimes are enabled.
241 #[cfg(feature = "runtime-tokio")]
242 {
243 Some(
244 crate::pool_metrics::spawn::<crate::runtime::TokioRuntime, DB>(
245 self.pool.clone(),
246 name.clone(),
247 self.pool_metrics_interval,
248 ),
249 )
250 }
251
252 #[cfg(all(feature = "runtime-async-std", not(feature = "runtime-tokio")))]
253 {
254 Some(crate::pool_metrics::spawn::<
255 crate::runtime::AsyncStdRuntime,
256 DB,
257 >(
258 self.pool.clone(),
259 name.clone(),
260 self.pool_metrics_interval,
261 ))
262 }
263
264 #[cfg(not(any(feature = "runtime-tokio", feature = "runtime-async-std")))]
265 {
266 let _ = name;
267 None
268 }
269 }
270}
271
272/// An instrumented wrapper around `sqlx::Pool` that emits OpenTelemetry spans and metrics
273/// for every database operation.
274///
275/// Create one via [`PoolBuilder`]. The wrapper is a drop-in replacement for `sqlx::Pool`:
276/// `&Pool<DB>` implements [`sqlx::Executor`], so you can pass it straight into
277/// `sqlx::query(...)`, `sqlx::query_as(...)`, and friends. Connections acquired via
278/// [`acquire`](Self::acquire) and transactions started via [`begin`](Self::begin) inherit
279/// the same instrumentation and produce spans / metrics with identical connection-level
280/// attributes.
281///
282/// `Clone` is cheap – the inner `sqlx::Pool`, the connection-level attribute set, and the
283/// metric instruments are all `Arc`-shared. Cloning never copies state; cloned pools share
284/// the same underlying connection pool and metric stream.
285///
286/// # Example
287///
288/// ```no_run
289/// # #[cfg(feature = "sqlite")]
290/// # async fn _doc() -> Result<(), sqlx::Error> {
291/// use sqlx_otel::PoolBuilder;
292///
293/// let raw = sqlx::SqlitePool::connect(":memory:").await?;
294/// let pool = PoolBuilder::from(raw).build();
295///
296/// // Pass `&pool` anywhere a `sqlx::Executor` is expected.
297/// let row: (i64,) = sqlx::query_as("SELECT 1").fetch_one(&pool).await?;
298/// assert_eq!(row.0, 1);
299/// # Ok(())
300/// # }
301/// ```
302///
303/// See also [`with_annotations`](Self::with_annotations) for per-query semantic-convention
304/// attributes, and [`crate::QueryAnnotateExt`] for attaching annotations on the query side
305/// instead of the executor side.
306#[derive(Debug)]
307pub struct Pool<DB: sqlx::Database> {
308 pub(crate) inner: sqlx::Pool<DB>,
309 pub(crate) state: SharedState,
310 /// Dropping this handle signals the background polling task to stop.
311 metrics_shutdown: Option<crate::pool_metrics::ShutdownHandle>,
312 /// Histogram for `db.client.connection.wait_time`, recorded on each `acquire()`.
313 wait_time: Arc<opentelemetry::metrics::Histogram<f64>>,
314 /// Histogram for `db.client.connection.use_time`, recorded when a connection is dropped.
315 pub(crate) use_time: Arc<opentelemetry::metrics::Histogram<f64>>,
316 /// Counter for `db.client.connection.timeouts`, incremented on `PoolTimedOut`.
317 timeouts: Arc<opentelemetry::metrics::Counter<u64>>,
318 /// Up/down counter for `db.client.connection.pending_requests`, tracks callers
319 /// currently waiting in `acquire()`.
320 pending_requests: Arc<opentelemetry::metrics::UpDownCounter<i64>>,
321}
322
323impl<DB: sqlx::Database> Clone for Pool<DB> {
324 fn clone(&self) -> Self {
325 Self {
326 inner: self.inner.clone(),
327 state: self.state.clone(),
328 metrics_shutdown: self.metrics_shutdown.clone(),
329 wait_time: self.wait_time.clone(),
330 use_time: self.use_time.clone(),
331 timeouts: self.timeouts.clone(),
332 pending_requests: self.pending_requests.clone(),
333 }
334 }
335}
336
337impl<DB: Database> Pool<DB> {
338 /// Acquire a pooled connection instrumented for OpenTelemetry.
339 ///
340 /// Records `db.client.connection.wait_time` (time spent waiting for a connection),
341 /// tracks `db.client.connection.pending_requests` while the call is in flight, and
342 /// increments `db.client.connection.timeouts` on `sqlx::Error::PoolTimedOut`. The
343 /// returned [`PoolConnection`] records `db.client.connection.use_time` when dropped
344 /// and is itself an [`sqlx::Executor`] via `&mut conn`.
345 ///
346 /// # Errors
347 ///
348 /// Returns `sqlx::Error` if a connection cannot be obtained from the pool – typically
349 /// `PoolTimedOut` when the configured acquire timeout elapses, or `PoolClosed` after
350 /// [`close`](Self::close).
351 pub async fn acquire(&self) -> Result<PoolConnection<DB>, sqlx::Error> {
352 let attrs = self.state.attrs.base_key_values();
353 self.pending_requests.add(1, &attrs);
354 let start = std::time::Instant::now();
355 let result = self.inner.acquire().await;
356 self.pending_requests.add(-1, &attrs);
357 self.wait_time.record(start.elapsed().as_secs_f64(), &attrs);
358
359 if let Err(sqlx::Error::PoolTimedOut) = &result {
360 self.timeouts.add(1, &attrs);
361 }
362
363 result.map(|inner| PoolConnection {
364 inner,
365 state: self.state.clone(),
366 use_time: self.use_time.clone(),
367 acquired_at: std::time::Instant::now(),
368 base_attrs: attrs,
369 })
370 }
371
372 /// Begin a new transaction instrumented for OpenTelemetry.
373 ///
374 /// The returned [`Transaction`] implements `sqlx::Executor` via `&mut tx` and emits
375 /// the same per-operation spans and metrics as the pool itself. Call
376 /// [`commit`](Transaction::commit) or [`rollback`](Transaction::rollback) to terminate
377 /// it; dropping the value without doing either rolls back implicitly (per `SQLx`'s
378 /// usual behaviour).
379 ///
380 /// # Errors
381 ///
382 /// Returns `sqlx::Error` if `BEGIN` fails – typically due to a connection problem or
383 /// because the underlying connection cannot start a new transaction.
384 pub async fn begin(&self) -> Result<Transaction<'_, DB>, sqlx::Error> {
385 self.inner.begin().await.map(|inner| Transaction {
386 inner,
387 state: self.state.clone(),
388 })
389 }
390
391 /// Shut down the pool, waiting for all connections to be released.
392 pub async fn close(&self) {
393 self.inner.close().await;
394 }
395
396 /// Returns `true` if the pool has been closed.
397 #[must_use]
398 pub fn is_closed(&self) -> bool {
399 self.inner.is_closed()
400 }
401
402 /// Return an annotated executor that attaches per-query semantic-convention attributes
403 /// (`db.operation.name`, `db.collection.name`, `db.query.summary`,
404 /// `db.stored_procedure.name`) to every span created by the next operation.
405 ///
406 /// The returned wrapper borrows the pool and implements `sqlx::Executor`. Use the
407 /// query-side equivalent ([`crate::QueryAnnotateExt`]) when the annotation belongs
408 /// next to the query text rather than next to the executor.
409 ///
410 /// # Example
411 ///
412 /// ```no_run
413 /// # #[cfg(feature = "sqlite")]
414 /// # async fn _doc() -> Result<(), sqlx::Error> {
415 /// # use sqlx_otel::PoolBuilder;
416 /// use sqlx::Executor as _;
417 /// use sqlx_otel::QueryAnnotations;
418 /// # let pool = PoolBuilder::from(sqlx::SqlitePool::connect(":memory:").await?).build();
419 ///
420 /// pool.with_annotations(
421 /// QueryAnnotations::new()
422 /// .operation("SELECT")
423 /// .collection("users"),
424 /// )
425 /// .fetch_all("SELECT * FROM users")
426 /// .await?;
427 /// # Ok(())
428 /// # }
429 /// ```
430 #[must_use]
431 pub fn with_annotations(&self, annotations: QueryAnnotations) -> Annotated<'_, Self> {
432 Annotated {
433 inner: self,
434 annotations,
435 state: self.state.clone(),
436 }
437 }
438
439 /// Shorthand for annotating the next operation with `db.operation.name` and
440 /// `db.collection.name`.
441 ///
442 /// Equivalent to `self.with_annotations(QueryAnnotations::new().operation(op).collection(coll))`.
443 #[must_use]
444 pub fn with_operation(
445 &self,
446 operation: impl Into<String>,
447 collection: impl Into<String>,
448 ) -> Annotated<'_, Self> {
449 self.with_annotations(
450 QueryAnnotations::new()
451 .operation(operation)
452 .collection(collection),
453 )
454 }
455}