sqlx-otel 0.2.0

Thin wrapper around SQLx that emits OpenTelemetry spans and metrics following the database client semantic conventions.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
#![cfg(feature = "sqlite")]

mod common;

use common::attr;
use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData};
use serial_test::serial;
use sqlx::Executor as _;
use sqlx_otel::PoolBuilder;
use std::time::Duration;

const POOL_NAME: &str = "test-pool";

/// Helper to find a named metric in the collected resource metrics.
fn find_metric<'a>(
    resource_metrics: &'a [opentelemetry_sdk::metrics::data::ResourceMetrics],
    name: &str,
) -> Option<&'a opentelemetry_sdk::metrics::data::Metric> {
    resource_metrics.iter().find_map(|rm| {
        rm.scope_metrics()
            .flat_map(opentelemetry_sdk::metrics::data::ScopeMetrics::metrics)
            .find(|m| m.name() == name)
    })
}

/// Helper to find an i64 gauge value with a specific attribute filter.
fn gauge_value(
    resource_metrics: &[opentelemetry_sdk::metrics::data::ResourceMetrics],
    name: &str,
    filter_key: &str,
    filter_value: &str,
) -> Option<i64> {
    let metric = find_metric(resource_metrics, name)?;
    if let AggregatedMetrics::I64(MetricData::Gauge(gauge)) = metric.data() {
        gauge.data_points().find_map(|dp| {
            let matches = dp
                .attributes()
                .any(|kv| kv.key.as_str() == filter_key && kv.value.to_string() == filter_value);
            if matches { Some(dp.value()) } else { None }
        })
    } else {
        None
    }
}

/// Helper to get any i64 gauge value (ignoring attribute filtering).
fn gauge_any_value(
    resource_metrics: &[opentelemetry_sdk::metrics::data::ResourceMetrics],
    name: &str,
) -> Option<i64> {
    let metric = find_metric(resource_metrics, name)?;
    if let AggregatedMetrics::I64(MetricData::Gauge(gauge)) = metric.data() {
        gauge
            .data_points()
            .next()
            .map(opentelemetry_sdk::metrics::data::GaugeDataPoint::value)
    } else {
        None
    }
}

/// Poll a closure until it returns `Some` or the deadline elapses.
///
/// Used to wait for background-task-driven metrics to arrive without depending on a
/// fixed sleep duration. The 10ms inter-poll cadence picks up state changes promptly
/// while keeping the busy-wait cost bounded; the caller-provided timeout sets the
/// upper bound that defines a flake threshold rather than a hard expectation.
async fn poll_for<F, T>(timeout: Duration, mut f: F) -> Option<T>
where
    F: FnMut() -> Option<T>,
{
    let deadline = std::time::Instant::now() + timeout;
    loop {
        if let Some(v) = f() {
            return Some(v);
        }
        if std::time::Instant::now() >= deadline {
            return None;
        }
        tokio::time::sleep(Duration::from_millis(10)).await;
    }
}

// ===========================================================================
// Static pool configuration gauges (no runtime needed)
// ===========================================================================

#[tokio::test]
#[serial]
async fn connection_max_matches_pool_options() {
    let tel = common::TestTelemetry::install();
    let raw = sqlx::pool::PoolOptions::<sqlx::Sqlite>::new()
        .max_connections(5)
        .connect(":memory:")
        .await
        .unwrap();
    let _pool = PoolBuilder::from(raw).build();

    let metrics = tel.metrics();
    let max = gauge_any_value(&metrics, "db.client.connection.max");
    assert_eq!(max, Some(5), "max connections should be 5");
}

#[tokio::test]
#[serial]
async fn connection_idle_min_matches_pool_options() {
    let tel = common::TestTelemetry::install();
    let raw = sqlx::pool::PoolOptions::<sqlx::Sqlite>::new()
        .min_connections(2)
        .connect(":memory:")
        .await
        .unwrap();
    let _pool = PoolBuilder::from(raw).build();

    let metrics = tel.metrics();
    let min = gauge_any_value(&metrics, "db.client.connection.idle.min");
    assert_eq!(min, Some(2), "idle min should be 2");
}

#[tokio::test]
#[serial]
async fn connection_idle_max_matches_max_connections() {
    let tel = common::TestTelemetry::install();
    let raw = sqlx::pool::PoolOptions::<sqlx::Sqlite>::new()
        .max_connections(7)
        .connect(":memory:")
        .await
        .unwrap();
    let _pool = PoolBuilder::from(raw).build();

    let metrics = tel.metrics();
    let idle_max = gauge_any_value(&metrics, "db.client.connection.idle.max");
    assert_eq!(idle_max, Some(7), "idle.max should equal max_connections");
}

// ===========================================================================
// Inline metrics (no runtime needed)
// ===========================================================================

#[tokio::test]
#[serial]
async fn wait_time_recorded_on_acquire() {
    let tel = common::TestTelemetry::install();
    let raw = sqlx::SqlitePool::connect(":memory:").await.unwrap();
    let pool = PoolBuilder::from(raw).build();

    let conn = pool.acquire().await.unwrap();
    drop(conn);

    let metrics = tel.metrics();
    let metric = find_metric(&metrics, "db.client.connection.wait_time");
    assert!(metric.is_some(), "wait_time metric should be present");

    if let Some(m) = metric {
        assert_eq!(m.unit(), "s");
        if let AggregatedMetrics::F64(MetricData::Histogram(hist)) = m.data() {
            let dp: Vec<_> = hist.data_points().collect();
            assert!(!dp.is_empty(), "should have data points");
            assert!(dp[0].count() >= 1, "should have at least one recording");
        } else {
            panic!("wait_time should be an f64 histogram");
        }
    }
}

#[tokio::test]
#[serial]
async fn use_time_recorded_on_connection_drop() {
    let tel = common::TestTelemetry::install();
    let raw = sqlx::SqlitePool::connect(":memory:").await.unwrap();
    let pool = PoolBuilder::from(raw).build();

    let conn = pool.acquire().await.unwrap();
    tokio::time::sleep(Duration::from_millis(10)).await;
    drop(conn);

    let metrics = tel.metrics();
    let metric = find_metric(&metrics, "db.client.connection.use_time");
    assert!(metric.is_some(), "use_time metric should be present");

    if let Some(m) = metric {
        assert_eq!(m.unit(), "s");
        if let AggregatedMetrics::F64(MetricData::Histogram(hist)) = m.data() {
            let dp: Vec<_> = hist.data_points().collect();
            assert!(!dp.is_empty(), "should have data points");
            assert!(dp[0].count() >= 1, "should have at least one recording");
        } else {
            panic!("use_time should be an f64 histogram");
        }
    }
}

#[tokio::test]
#[serial]
async fn timeouts_counter_incremented_on_pool_timeout() {
    let tel = common::TestTelemetry::install();
    let raw = sqlx::pool::PoolOptions::<sqlx::Sqlite>::new()
        .max_connections(1)
        .acquire_timeout(Duration::from_millis(10))
        .connect(":memory:")
        .await
        .unwrap();
    let pool = PoolBuilder::from(raw).build();

    let _conn = pool.acquire().await.unwrap();
    let result = pool.acquire().await;
    assert!(result.is_err(), "should time out");

    let metrics = tel.metrics();
    let metric = find_metric(&metrics, "db.client.connection.timeouts");
    assert!(metric.is_some(), "timeouts metric should be present");

    if let Some(m) = metric {
        if let AggregatedMetrics::U64(MetricData::Sum(sum)) = m.data() {
            let total: u64 = sum
                .data_points()
                .map(opentelemetry_sdk::metrics::data::SumDataPoint::value)
                .sum();
            assert!(total >= 1, "should have at least one timeout recorded");
        } else {
            panic!("timeouts should be a u64 sum/counter");
        }
    }
}

#[tokio::test]
#[serial]
async fn pending_requests_recorded_on_acquire() {
    let tel = common::TestTelemetry::install();
    let raw = sqlx::SqlitePool::connect(":memory:").await.unwrap();
    let pool = PoolBuilder::from(raw).build();

    let conn = pool.acquire().await.unwrap();
    drop(conn);

    let metrics = tel.metrics();
    let metric = find_metric(&metrics, "db.client.connection.pending_requests");
    assert!(
        metric.is_some(),
        "pending_requests metric should be present"
    );

    if let Some(m) = metric {
        if let AggregatedMetrics::I64(MetricData::Sum(sum)) = m.data() {
            let dp: Vec<_> = sum.data_points().collect();
            assert!(!dp.is_empty(), "should have data points");
            assert_eq!(dp[0].value(), 0, "net pending should be 0 after release");
        } else {
            panic!("pending_requests should be an i64 sum (UpDownCounter)");
        }
    }
}

#[tokio::test]
#[serial]
async fn spans_still_emitted_with_pool_metrics() {
    let tel = common::TestTelemetry::install();
    let raw = sqlx::SqlitePool::connect(":memory:").await.unwrap();
    let pool = PoolBuilder::from(raw)
        .with_pool_name(POOL_NAME)
        .with_pool_metrics_interval(Duration::from_millis(50))
        .build();

    let _ = (&pool).fetch_optional("SELECT 1").await.unwrap();

    let spans = tel.spans();
    assert_eq!(spans.len(), 1);
    assert_eq!(
        attr(&spans[0], "db.system.name"),
        Some(opentelemetry::Value::String("sqlite".into()))
    );
}

// ===========================================================================
// Debug impls
// ===========================================================================

#[tokio::test]
#[serial]
async fn pool_connection_debug() {
    let _tel = common::TestTelemetry::install();
    let raw = sqlx::SqlitePool::connect(":memory:").await.unwrap();
    let pool = PoolBuilder::from(raw).build();

    let conn = pool.acquire().await.unwrap();
    let debug = format!("{conn:?}");
    assert!(debug.contains("PoolConnection"), "Debug output: {debug}");
}

#[tokio::test]
#[serial]
async fn pool_clone() {
    let _tel = common::TestTelemetry::install();
    let raw = sqlx::SqlitePool::connect(":memory:").await.unwrap();
    let pool = PoolBuilder::from(raw).build();

    let cloned = pool.clone();
    assert!(!cloned.is_closed());
}

#[cfg(feature = "runtime-tokio")]
#[tokio::test]
#[serial]
async fn pool_debug_with_shutdown_handle() {
    let _tel = common::TestTelemetry::install();
    let raw = sqlx::SqlitePool::connect(":memory:").await.unwrap();
    let pool = PoolBuilder::from(raw)
        .with_pool_name(POOL_NAME)
        .with_pool_metrics_interval(Duration::from_millis(50))
        .build();

    let debug = format!("{pool:?}");
    assert!(debug.contains("Pool"), "Debug output: {debug}");
    assert!(
        debug.contains("ShutdownHandle"),
        "Should contain ShutdownHandle: {debug}"
    );
}

// ===========================================================================
// Connection count via background task (runtime-tokio)
// ===========================================================================

#[cfg(feature = "runtime-tokio")]
mod tokio_runtime {
    use super::*;

    #[tokio::test]
    #[serial]
    async fn connection_count_reports_idle_and_used() {
        let tel = common::TestTelemetry::install();
        let raw = sqlx::SqlitePool::connect(":memory:").await.unwrap();
        let pool = PoolBuilder::from(raw)
            .with_pool_name(POOL_NAME)
            .with_pool_metrics_interval(Duration::from_millis(50))
            .build();

        let conn = pool.acquire().await.unwrap();

        let metrics = poll_for(Duration::from_secs(2), || {
            let snapshot = tel.metrics();
            find_metric(&snapshot, "db.client.connection.count")?;
            Some(snapshot)
        })
        .await
        .expect("db.client.connection.count metric should be reported within 2s");

        let idle = gauge_value(
            &metrics,
            "db.client.connection.count",
            "db.client.connection.state",
            "idle",
        );
        let used = gauge_value(
            &metrics,
            "db.client.connection.count",
            "db.client.connection.state",
            "used",
        );

        assert!(idle.is_some(), "idle count should be reported");
        assert!(used.is_some(), "used count should be reported");
        assert!(used.unwrap() >= 1, "at least one connection should be used");

        let metric = find_metric(&metrics, "db.client.connection.count").unwrap();
        if let AggregatedMetrics::I64(MetricData::Gauge(gauge)) = metric.data() {
            let has_pool_name = gauge.data_points().any(|dp| {
                dp.attributes().any(|kv| {
                    kv.key.as_str() == "db.client.connection.pool.name"
                        && kv.value.to_string() == POOL_NAME
                })
            });
            assert!(has_pool_name, "pool name attribute missing");
        }

        drop(conn);
    }

    #[tokio::test]
    #[serial]
    async fn no_pool_metrics_without_pool_name() {
        let tel = common::TestTelemetry::install();
        let raw = sqlx::SqlitePool::connect(":memory:").await.unwrap();
        let pool = PoolBuilder::from(raw)
            .with_pool_metrics_interval(Duration::from_millis(50))
            .build();

        // Asserting absence-after-wait: polling cannot replace this sleep, since the
        // expected outcome is that no metric ever arrives. The 100ms window is two task
        // intervals – long enough for a regression that *did* emit metrics to be caught.
        tokio::time::sleep(Duration::from_millis(100)).await;

        let metrics = tel.metrics();
        let count = find_metric(&metrics, "db.client.connection.count");
        assert!(
            count.is_none(),
            "pool metrics should not be emitted without a pool name"
        );

        drop(pool);
    }

    #[tokio::test]
    #[serial]
    async fn background_task_stops_on_pool_drop() {
        let _tel = common::TestTelemetry::install();
        let raw = sqlx::SqlitePool::connect(":memory:").await.unwrap();
        let pool = PoolBuilder::from(raw)
            .with_pool_name(POOL_NAME)
            .with_pool_metrics_interval(Duration::from_millis(50))
            .build();

        drop(pool);
        // Asserting absence-after-drop: the sleep is the deliberate window during which a
        // still-running task would emit a recording. Polling cannot replace it because
        // the expected outcome is silence.
        tokio::time::sleep(Duration::from_millis(100)).await;
    }
}

// ===========================================================================
// Connection count via background task (runtime-async-std)
// ===========================================================================

#[cfg(all(feature = "runtime-async-std", not(feature = "runtime-tokio")))]
mod async_std_runtime {
    use super::*;

    #[tokio::test]
    #[serial]
    async fn connection_count_reports_idle_and_used() {
        let tel = common::TestTelemetry::install();
        let raw = sqlx::SqlitePool::connect(":memory:").await.unwrap();
        let pool = PoolBuilder::from(raw)
            .with_pool_name(POOL_NAME)
            .with_pool_metrics_interval(Duration::from_millis(50))
            .build();

        let conn = pool.acquire().await.unwrap();

        let metrics = poll_for(Duration::from_secs(2), || {
            let snapshot = tel.metrics();
            find_metric(&snapshot, "db.client.connection.count")?;
            Some(snapshot)
        })
        .await
        .expect("db.client.connection.count metric should be reported within 2s");

        let idle = gauge_value(
            &metrics,
            "db.client.connection.count",
            "db.client.connection.state",
            "idle",
        );
        let used = gauge_value(
            &metrics,
            "db.client.connection.count",
            "db.client.connection.state",
            "used",
        );

        assert!(idle.is_some(), "idle count should be reported");
        assert!(used.is_some(), "used count should be reported");
        assert!(used.unwrap() >= 1, "at least one connection should be used");

        drop(conn);
    }
}