Skip to main content

sqlx_otel/
pool.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use opentelemetry_semantic_conventions::metric as semconv_metric;
5
6use crate::annotations::{Annotated, QueryAnnotations};
7use crate::attributes::{ConnectionAttributes, QueryTextMode};
8use crate::connection::PoolConnection;
9use crate::database::Database;
10use crate::metrics::Metrics;
11use crate::transaction::Transaction;
12
13/// Shared state propagated to every wrapper type derived from a pool.
14#[derive(Debug, Clone)]
15pub(crate) struct SharedState {
16    pub attrs: Arc<ConnectionAttributes>,
17    pub metrics: Arc<Metrics>,
18}
19
20/// Builder for constructing an instrumented [`Pool`] from a raw `sqlx::Pool`.
21///
22/// The builder auto-extracts connection attributes (host, port, database namespace) from
23/// the underlying connect options via the [`Database`] trait, then lets you override any of
24/// them before calling [`build`](Self::build). Settings on the wrapped `sqlx::Pool` itself
25/// (max connections, idle timeout, etc.) should be applied to the `sqlx::Pool` *before*
26/// passing it to the builder – `sqlx-otel` does not duplicate `SQLx`'s configuration
27/// surface.
28///
29/// # Example
30///
31/// ```no_run
32/// # #[cfg(feature = "sqlite")]
33/// # async fn _doc() -> Result<(), sqlx::Error> {
34/// use sqlx_otel::{PoolBuilder, QueryTextMode};
35/// use std::time::Duration;
36///
37/// let raw = sqlx::SqlitePool::connect(":memory:").await?;
38/// let pool = PoolBuilder::from(raw)
39///     .with_database("my_db")
40///     .with_query_text_mode(QueryTextMode::Obfuscated)
41///     .with_pool_name("my-service-db")
42///     .with_pool_metrics_interval(Duration::from_secs(5))
43///     .build();
44/// # let _ = pool;
45/// # Ok(())
46/// # }
47/// ```
48#[derive(Debug)]
49pub struct PoolBuilder<DB: sqlx::Database> {
50    pool: sqlx::Pool<DB>,
51    host: Option<String>,
52    port: Option<u16>,
53    namespace: Option<String>,
54    network_peer_address: Option<String>,
55    network_peer_port: Option<u16>,
56    query_text_mode: QueryTextMode,
57    pool_name: Option<String>,
58    pool_metrics_interval: Duration,
59}
60
61impl<DB: Database> From<sqlx::Pool<DB>> for PoolBuilder<DB> {
62    /// Create a builder from an existing `sqlx::Pool`, auto-extracting connection
63    /// attributes from the backend's connect options.
64    fn from(pool: sqlx::Pool<DB>) -> Self {
65        let (host, port, namespace) = DB::connection_attributes(&pool);
66        Self {
67            pool,
68            host,
69            port,
70            namespace,
71            network_peer_address: None,
72            network_peer_port: None,
73            query_text_mode: QueryTextMode::default(),
74            pool_name: None,
75            pool_metrics_interval: Duration::from_secs(10),
76        }
77    }
78}
79
80impl<DB: Database> PoolBuilder<DB> {
81    /// Override the `db.namespace` attribute (the database name).
82    #[must_use]
83    pub fn with_database(mut self, database: impl Into<String>) -> Self {
84        self.namespace = Some(database.into());
85        self
86    }
87
88    /// Override the `server.address` attribute (the logical hostname).
89    #[must_use]
90    pub fn with_host(mut self, host: impl Into<String>) -> Self {
91        self.host = Some(host.into());
92        self
93    }
94
95    /// Override the `server.port` attribute.
96    #[must_use]
97    pub fn with_port(mut self, port: u16) -> Self {
98        self.port = Some(port);
99        self
100    }
101
102    /// Set the `network.peer.address` attribute (the resolved IP address).
103    #[must_use]
104    pub fn with_network_peer_address(mut self, address: impl Into<String>) -> Self {
105        self.network_peer_address = Some(address.into());
106        self
107    }
108
109    /// Set the `network.peer.port` attribute (the resolved port).
110    #[must_use]
111    pub fn with_network_peer_port(mut self, port: u16) -> Self {
112        self.network_peer_port = Some(port);
113        self
114    }
115
116    /// Configure how `db.query.text` is captured on spans. Defaults to
117    /// [`QueryTextMode::Full`].
118    #[must_use]
119    pub fn with_query_text_mode(mut self, mode: QueryTextMode) -> Self {
120        self.query_text_mode = mode;
121        self
122    }
123
124    /// Set the `db.client.connection.pool.name` attribute and enable the
125    /// `db.client.connection.count` polling task.
126    ///
127    /// When a runtime feature (`runtime-tokio` or `runtime-async-std`) is also enabled, a
128    /// background task is spawned that periodically records `db.client.connection.count`
129    /// (idle / used). See [`with_pool_metrics_interval`](Self::with_pool_metrics_interval)
130    /// to configure the polling frequency. The task is cancelled when the [`Pool`] (and
131    /// every clone of it) is dropped.
132    ///
133    /// **Without a runtime feature, the name is recorded but no `connection.count` task is
134    /// spawned and the gauge is never reported.** All other operation- and pool-level
135    /// metrics still work in that configuration.
136    #[must_use]
137    pub fn with_pool_name(mut self, name: impl Into<String>) -> Self {
138        self.pool_name = Some(name.into());
139        self
140    }
141
142    /// Set the polling interval for `db.client.connection.count`. Defaults to 10 seconds.
143    ///
144    /// Has no effect unless [`with_pool_name`](Self::with_pool_name) is also called and a
145    /// runtime feature is enabled.
146    #[must_use]
147    pub fn with_pool_metrics_interval(mut self, interval: Duration) -> Self {
148        self.pool_metrics_interval = interval;
149        self
150    }
151
152    /// Consume the builder and produce an instrumented [`Pool`].
153    ///
154    /// At this point the static pool gauges (`db.client.connection.max`,
155    /// `db.client.connection.idle.max`, `db.client.connection.idle.min`) are recorded
156    /// once with the connection-level attributes – they do not change over the pool's
157    /// lifetime. The wait-time / use-time / timeout / pending-request instruments are
158    /// created here and updated inline on every `acquire()` and connection drop.
159    #[must_use]
160    pub fn build(self) -> Pool<DB> {
161        let metrics_shutdown = self.spawn_pool_metrics_task();
162
163        let attrs = Arc::new(ConnectionAttributes {
164            system: DB::SYSTEM,
165            host: self.host,
166            port: self.port,
167            namespace: self.namespace,
168            network_peer_address: self.network_peer_address,
169            network_peer_port: self.network_peer_port,
170            query_text_mode: self.query_text_mode,
171        });
172        let metrics = Arc::new(Metrics::new());
173        let meter = opentelemetry::global::meter("sqlx-otel");
174
175        // Record static pool configuration gauges once – these never change.
176        let max_conns = i64::from(self.pool.options().get_max_connections());
177        let min_conns = i64::from(self.pool.options().get_min_connections());
178        let base_attrs = attrs.base_key_values();
179
180        meter
181            .i64_gauge(semconv_metric::DB_CLIENT_CONNECTION_MAX)
182            .with_description("The maximum number of open connections allowed.")
183            .build()
184            .record(max_conns, &base_attrs);
185        meter
186            .i64_gauge(semconv_metric::DB_CLIENT_CONNECTION_IDLE_MAX)
187            .with_description("The maximum number of idle open connections allowed.")
188            .build()
189            .record(max_conns, &base_attrs);
190        meter
191            .i64_gauge(semconv_metric::DB_CLIENT_CONNECTION_IDLE_MIN)
192            .with_description("The minimum number of idle open connections allowed.")
193            .build()
194            .record(min_conns, &base_attrs);
195
196        Pool {
197            inner: self.pool,
198            state: SharedState { attrs, metrics },
199            metrics_shutdown,
200            wait_time: Arc::new(
201                meter
202                    .f64_histogram(semconv_metric::DB_CLIENT_CONNECTION_WAIT_TIME)
203                    .with_unit("s")
204                    .with_description(
205                        "The time it took to obtain an open connection from the pool.",
206                    )
207                    .build(),
208            ),
209            use_time: Arc::new(
210                meter
211                    .f64_histogram(semconv_metric::DB_CLIENT_CONNECTION_USE_TIME)
212                    .with_unit("s")
213                    .with_description(
214                        "The time between borrowing a connection and returning it to the pool.",
215                    )
216                    .build(),
217            ),
218            timeouts: Arc::new(
219                meter
220                    .u64_counter(semconv_metric::DB_CLIENT_CONNECTION_TIMEOUTS)
221                    .with_description(
222                        "The number of connection pool acquire attempts that timed out.",
223                    )
224                    .build(),
225            ),
226            pending_requests: Arc::new(
227                meter
228                    .i64_up_down_counter(semconv_metric::DB_CLIENT_CONNECTION_PENDING_REQUESTS)
229                    .with_description("The number of pending requests for an open connection.")
230                    .build(),
231            ),
232        }
233    }
234
235    /// Spawn the pool metrics background task if a pool name is set and a runtime is
236    /// available. Returns the shutdown handle (or `None`).
237    fn spawn_pool_metrics_task(&self) -> Option<crate::pool_metrics::ShutdownHandle> {
238        let name = self.pool_name.as_ref()?;
239
240        // Prefer tokio if both runtimes are enabled.
241        #[cfg(feature = "runtime-tokio")]
242        {
243            Some(
244                crate::pool_metrics::spawn::<crate::runtime::TokioRuntime, DB>(
245                    self.pool.clone(),
246                    name.clone(),
247                    self.pool_metrics_interval,
248                ),
249            )
250        }
251
252        #[cfg(all(feature = "runtime-async-std", not(feature = "runtime-tokio")))]
253        {
254            Some(crate::pool_metrics::spawn::<
255                crate::runtime::AsyncStdRuntime,
256                DB,
257            >(
258                self.pool.clone(),
259                name.clone(),
260                self.pool_metrics_interval,
261            ))
262        }
263
264        #[cfg(not(any(feature = "runtime-tokio", feature = "runtime-async-std")))]
265        {
266            let _ = name;
267            None
268        }
269    }
270}
271
272/// An instrumented wrapper around `sqlx::Pool` that emits OpenTelemetry spans and metrics
273/// for every database operation.
274///
275/// Create one via [`PoolBuilder`]. The wrapper is a drop-in replacement for `sqlx::Pool`:
276/// `&Pool<DB>` implements [`sqlx::Executor`], so you can pass it straight into
277/// `sqlx::query(...)`, `sqlx::query_as(...)`, and friends. Connections acquired via
278/// [`acquire`](Self::acquire) and transactions started via [`begin`](Self::begin) inherit
279/// the same instrumentation and produce spans / metrics with identical connection-level
280/// attributes.
281///
282/// `Clone` is cheap – the inner `sqlx::Pool`, the connection-level attribute set, and the
283/// metric instruments are all `Arc`-shared. Cloning never copies state; cloned pools share
284/// the same underlying connection pool and metric stream.
285///
286/// # Example
287///
288/// ```no_run
289/// # #[cfg(feature = "sqlite")]
290/// # async fn _doc() -> Result<(), sqlx::Error> {
291/// use sqlx_otel::PoolBuilder;
292///
293/// let raw = sqlx::SqlitePool::connect(":memory:").await?;
294/// let pool = PoolBuilder::from(raw).build();
295///
296/// // Pass `&pool` anywhere a `sqlx::Executor` is expected.
297/// let row: (i64,) = sqlx::query_as("SELECT 1").fetch_one(&pool).await?;
298/// assert_eq!(row.0, 1);
299/// # Ok(())
300/// # }
301/// ```
302///
303/// See also [`with_annotations`](Self::with_annotations) for per-query semantic-convention
304/// attributes, and [`crate::QueryAnnotateExt`] for attaching annotations on the query side
305/// instead of the executor side.
306#[derive(Debug)]
307pub struct Pool<DB: sqlx::Database> {
308    pub(crate) inner: sqlx::Pool<DB>,
309    pub(crate) state: SharedState,
310    /// Dropping this handle signals the background polling task to stop.
311    metrics_shutdown: Option<crate::pool_metrics::ShutdownHandle>,
312    /// Histogram for `db.client.connection.wait_time`, recorded on each `acquire()`.
313    wait_time: Arc<opentelemetry::metrics::Histogram<f64>>,
314    /// Histogram for `db.client.connection.use_time`, recorded when a connection is dropped.
315    pub(crate) use_time: Arc<opentelemetry::metrics::Histogram<f64>>,
316    /// Counter for `db.client.connection.timeouts`, incremented on `PoolTimedOut`.
317    timeouts: Arc<opentelemetry::metrics::Counter<u64>>,
318    /// Up/down counter for `db.client.connection.pending_requests`, tracks callers
319    /// currently waiting in `acquire()`.
320    pending_requests: Arc<opentelemetry::metrics::UpDownCounter<i64>>,
321}
322
323impl<DB: sqlx::Database> Clone for Pool<DB> {
324    fn clone(&self) -> Self {
325        Self {
326            inner: self.inner.clone(),
327            state: self.state.clone(),
328            metrics_shutdown: self.metrics_shutdown.clone(),
329            wait_time: self.wait_time.clone(),
330            use_time: self.use_time.clone(),
331            timeouts: self.timeouts.clone(),
332            pending_requests: self.pending_requests.clone(),
333        }
334    }
335}
336
337impl<DB: Database> Pool<DB> {
338    /// Acquire a pooled connection instrumented for OpenTelemetry.
339    ///
340    /// Records `db.client.connection.wait_time` (time spent waiting for a connection),
341    /// tracks `db.client.connection.pending_requests` while the call is in flight, and
342    /// increments `db.client.connection.timeouts` on `sqlx::Error::PoolTimedOut`. The
343    /// returned [`PoolConnection`] records `db.client.connection.use_time` when dropped
344    /// and is itself an [`sqlx::Executor`] via `&mut conn`.
345    ///
346    /// # Errors
347    ///
348    /// Returns `sqlx::Error` if a connection cannot be obtained from the pool – typically
349    /// `PoolTimedOut` when the configured acquire timeout elapses, or `PoolClosed` after
350    /// [`close`](Self::close).
351    pub async fn acquire(&self) -> Result<PoolConnection<DB>, sqlx::Error> {
352        let attrs = self.state.attrs.base_key_values();
353        self.pending_requests.add(1, &attrs);
354        let start = std::time::Instant::now();
355        let result = self.inner.acquire().await;
356        self.pending_requests.add(-1, &attrs);
357        self.wait_time.record(start.elapsed().as_secs_f64(), &attrs);
358
359        if let Err(sqlx::Error::PoolTimedOut) = &result {
360            self.timeouts.add(1, &attrs);
361        }
362
363        result.map(|inner| PoolConnection {
364            inner,
365            state: self.state.clone(),
366            use_time: self.use_time.clone(),
367            acquired_at: std::time::Instant::now(),
368            base_attrs: attrs,
369        })
370    }
371
372    /// Begin a new transaction instrumented for OpenTelemetry.
373    ///
374    /// The returned [`Transaction`] implements `sqlx::Executor` via `&mut tx` and emits
375    /// the same per-operation spans and metrics as the pool itself. Call
376    /// [`commit`](Transaction::commit) or [`rollback`](Transaction::rollback) to terminate
377    /// it; dropping the value without doing either rolls back implicitly (per `SQLx`'s
378    /// usual behaviour).
379    ///
380    /// # Errors
381    ///
382    /// Returns `sqlx::Error` if `BEGIN` fails – typically due to a connection problem or
383    /// because the underlying connection cannot start a new transaction.
384    pub async fn begin(&self) -> Result<Transaction<'_, DB>, sqlx::Error> {
385        self.inner.begin().await.map(|inner| Transaction {
386            inner,
387            state: self.state.clone(),
388        })
389    }
390
391    /// Shut down the pool, waiting for all connections to be released.
392    pub async fn close(&self) {
393        self.inner.close().await;
394    }
395
396    /// Returns `true` if the pool has been closed.
397    #[must_use]
398    pub fn is_closed(&self) -> bool {
399        self.inner.is_closed()
400    }
401
402    /// Return an annotated executor that attaches per-query semantic-convention attributes
403    /// (`db.operation.name`, `db.collection.name`, `db.query.summary`,
404    /// `db.stored_procedure.name`) to every span created by the next operation.
405    ///
406    /// The returned wrapper borrows the pool and implements `sqlx::Executor`. Use the
407    /// query-side equivalent ([`crate::QueryAnnotateExt`]) when the annotation belongs
408    /// next to the query text rather than next to the executor.
409    ///
410    /// # Example
411    ///
412    /// ```no_run
413    /// # #[cfg(feature = "sqlite")]
414    /// # async fn _doc() -> Result<(), sqlx::Error> {
415    /// # use sqlx_otel::PoolBuilder;
416    /// use sqlx::Executor as _;
417    /// use sqlx_otel::QueryAnnotations;
418    /// # let pool = PoolBuilder::from(sqlx::SqlitePool::connect(":memory:").await?).build();
419    ///
420    /// pool.with_annotations(
421    ///     QueryAnnotations::new()
422    ///         .operation("SELECT")
423    ///         .collection("users"),
424    /// )
425    /// .fetch_all("SELECT * FROM users")
426    /// .await?;
427    /// # Ok(())
428    /// # }
429    /// ```
430    #[must_use]
431    pub fn with_annotations(&self, annotations: QueryAnnotations) -> Annotated<'_, Self> {
432        Annotated {
433            inner: self,
434            annotations,
435            state: self.state.clone(),
436        }
437    }
438
439    /// Shorthand for annotating the next operation with `db.operation.name` and
440    /// `db.collection.name`.
441    ///
442    /// Equivalent to `self.with_annotations(QueryAnnotations::new().operation(op).collection(coll))`.
443    #[must_use]
444    pub fn with_operation(
445        &self,
446        operation: impl Into<String>,
447        collection: impl Into<String>,
448    ) -> Annotated<'_, Self> {
449        self.with_annotations(
450            QueryAnnotations::new()
451                .operation(operation)
452                .collection(collection),
453        )
454    }
455}