#![cfg(feature = "postgres")]
mod common;
use std::sync::OnceLock;
use std::time::Duration;
use common::{assert_common_span_attributes, assert_error_span, attr};
use futures::StreamExt;
use opentelemetry::trace::SpanKind;
use serial_test::serial;
use sqlx::Executor as _;
use sqlx::Postgres;
use sqlx_otel::{Pool, PoolBuilder, QueryAnnotations, Transaction};
use testcontainers::core::IntoContainerPort;
use testcontainers::runners::AsyncRunner;
use testcontainers::{ContainerAsync, GenericImage, ImageExt};
use tokio::sync::OnceCell;
const SYSTEM: &str = "postgresql";
struct SharedContainer {
_container: ContainerAsync<GenericImage>,
url: String,
}
static CONTAINER: OnceLock<OnceCell<SharedContainer>> = OnceLock::new();
async fn shared_container() -> &'static SharedContainer {
CONTAINER
.get_or_init(OnceCell::new)
.get_or_init(|| async {
let container = GenericImage::new("postgres", "15-alpine")
.with_wait_for(testcontainers::core::WaitFor::message_on_stderr(
"database system is ready to accept connections",
))
.with_exposed_port(5432.tcp())
.with_env_var("POSTGRES_USER", "postgres")
.with_env_var("POSTGRES_DB", "testdb")
.with_env_var("POSTGRES_HOST_AUTH_METHOD", "trust")
.with_startup_timeout(Duration::from_secs(60))
.start()
.await
.expect("starting postgres container");
let port = container.get_host_port_ipv4(5432).await.unwrap();
let url = format!("postgres://postgres@localhost:{port}/testdb");
SharedContainer {
_container: container,
url,
}
})
.await
}
async fn test_pool() -> Pool<Postgres> {
let shared = shared_container().await;
let raw = sqlx::PgPool::connect(&shared.url).await.unwrap();
PoolBuilder::from(raw).build()
}
fn test_annotations() -> QueryAnnotations {
QueryAnnotations::new()
.operation("SELECT")
.collection("users")
}
fn assert_annotated_span(span: &opentelemetry_sdk::trace::SpanData) {
assert_eq!(span.span_kind, SpanKind::Client);
assert_eq!(span.name, "SELECT users");
assert_eq!(
attr(span, "db.system.name"),
Some(opentelemetry::Value::String(SYSTEM.to_owned().into())),
);
assert_eq!(
attr(span, "db.operation.name"),
Some(opentelemetry::Value::String("SELECT".into())),
);
assert_eq!(
attr(span, "db.collection.name"),
Some(opentelemetry::Value::String("users".into())),
);
}
#[tokio::test]
#[serial]
async fn execute_creates_span_via_pool() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
sqlx::query("CREATE TABLE IF NOT EXISTS exec_pool (id SERIAL PRIMARY KEY)")
.execute(&pool)
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert!(attr(&spans[0], "db.response.returned_rows").is_none());
assert!(attr(&spans[0], "db.response.affected_rows").is_some());
pool.with_annotations(test_annotations())
.execute("CREATE TABLE IF NOT EXISTS exec_pool (id SERIAL PRIMARY KEY)")
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
pool.with_operation("SELECT", "users")
.execute("CREATE TABLE IF NOT EXISTS exec_pool3 (id SERIAL PRIMARY KEY)")
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn execute_creates_span_via_connection() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut conn = pool.acquire().await.unwrap();
sqlx::query("CREATE TABLE IF NOT EXISTS exec_conn (id SERIAL PRIMARY KEY)")
.execute(&mut conn)
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert!(attr(&spans[0], "db.response.returned_rows").is_none());
assert!(attr(&spans[0], "db.response.affected_rows").is_some());
conn.with_annotations(test_annotations())
.execute("CREATE TABLE IF NOT EXISTS exec_conn (id SERIAL PRIMARY KEY)")
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
conn.with_operation("SELECT", "users")
.execute("CREATE TABLE IF NOT EXISTS exec_conn3 (id SERIAL PRIMARY KEY)")
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn execute_creates_span_via_transaction() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut tx: Transaction<'_, Postgres> = pool.begin().await.unwrap();
sqlx::query("CREATE TABLE IF NOT EXISTS exec_tx (id SERIAL PRIMARY KEY)")
.execute(&mut tx)
.await
.unwrap();
tx.with_annotations(test_annotations())
.execute("CREATE TABLE IF NOT EXISTS exec_tx (id SERIAL PRIMARY KEY)")
.await
.unwrap();
tx.with_operation("SELECT", "users")
.execute("CREATE TABLE IF NOT EXISTS exec_tx3 (id SERIAL PRIMARY KEY)")
.await
.unwrap();
tx.commit().await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 3);
assert_common_span_attributes(&spans[0], SYSTEM);
assert!(attr(&spans[0], "db.response.returned_rows").is_none());
assert!(attr(&spans[0], "db.response.affected_rows").is_some());
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn execute_records_error() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let result = sqlx::query("INVALID SQL GIBBERISH").execute(&pool).await;
assert!(result.is_err());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert_error_span(&spans[0]);
assert!(attr(&spans[0], "db.response.returned_rows").is_none());
let result = pool
.with_annotations(test_annotations())
.execute("INVALID SQL GIBBERISH")
.await;
assert!(result.is_err());
let spans = tel.spans();
let last = spans.last().unwrap();
assert_annotated_span(last);
assert_error_span(last);
let result = pool
.with_operation("SELECT", "users")
.execute("INVALID SQL GIBBERISH")
.await;
assert!(result.is_err());
let last = tel.spans().last().unwrap().clone();
assert_annotated_span(&last);
assert_error_span(&last);
}
#[tokio::test]
#[serial]
async fn execute_records_affected_rows() {
let _setup_tel = common::TestTelemetry::install();
let pool = test_pool().await;
sqlx::query(
"CREATE TABLE IF NOT EXISTS affected_test (id INT PRIMARY KEY, name TEXT NOT NULL)",
)
.execute(&pool)
.await
.unwrap();
sqlx::query("DELETE FROM affected_test")
.execute(&pool)
.await
.unwrap();
let tel = common::TestTelemetry::install();
sqlx::query(
"INSERT INTO affected_test (id, name) VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')",
)
.execute(&pool)
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
attr(&spans[0], "db.response.affected_rows"),
Some(opentelemetry::Value::I64(3)),
"inserting 3 rows in one statement should affect 3 rows"
);
let tel = common::TestTelemetry::install();
sqlx::query(
"INSERT INTO affected_test (id, name) VALUES (1, 'alice_updated') \
ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name",
)
.execute(&pool)
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
attr(&spans[0], "db.response.affected_rows"),
Some(opentelemetry::Value::I64(1)),
"upsert should affect 1 row"
);
let tel = common::TestTelemetry::install();
sqlx::query("UPDATE affected_test SET name = name || '_updated' WHERE id IN (2, 3)")
.execute(&pool)
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
attr(&spans[0], "db.response.affected_rows"),
Some(opentelemetry::Value::I64(2)),
"updating two rows should affect 2 rows"
);
let tel = common::TestTelemetry::install();
sqlx::query("DELETE FROM affected_test WHERE id IN (1, 2, 3)")
.execute(&pool)
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
attr(&spans[0], "db.response.affected_rows"),
Some(opentelemetry::Value::I64(3)),
"deleting three rows should affect 3 rows"
);
let tel = common::TestTelemetry::install();
sqlx::query("DELETE FROM affected_test WHERE id = 999")
.execute(&pool)
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
attr(&spans[0], "db.response.affected_rows"),
Some(opentelemetry::Value::I64(0)),
"deleting non-existent rows should affect 0 rows"
);
}
#[tokio::test]
#[serial]
async fn execute_many_via_pool() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut stream = (&pool).execute_many("SELECT 1; SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert_eq!(
attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(0))
);
let mut stream = pool
.with_annotations(test_annotations())
.execute_many("SELECT 1; SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
assert_annotated_span(tel.spans().last().unwrap());
let mut stream = pool
.with_operation("SELECT", "users")
.execute_many("SELECT 1; SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn execute_many_via_connection() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut conn = pool.acquire().await.unwrap();
let mut stream = (&mut conn).execute_many("SELECT 1; SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert_eq!(
attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(0))
);
let mut stream = conn
.with_annotations(test_annotations())
.execute_many("SELECT 1; SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
assert_annotated_span(tel.spans().last().unwrap());
let mut stream = conn
.with_operation("SELECT", "users")
.execute_many("SELECT 1; SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn execute_many_via_transaction() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut tx: Transaction<'_, Postgres> = pool.begin().await.unwrap();
let mut stream = (&mut tx).execute_many("SELECT 1; SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
let mut stream = tx
.with_annotations(test_annotations())
.execute_many("SELECT 1; SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
let mut stream = tx
.with_operation("SELECT", "users")
.execute_many("SELECT 1; SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
tx.commit().await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 3);
assert_common_span_attributes(&spans[0], SYSTEM);
assert_eq!(
attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(0))
);
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn execute_many_records_error() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut stream = (&pool).execute_many("INVALID SQL GIBBERISH");
let result = stream.next().await;
assert!(result.is_some_and(|r| r.is_err()));
drop(stream);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert_error_span(&spans[0]);
assert_eq!(
attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(0))
);
let mut stream = pool
.with_annotations(test_annotations())
.execute_many("INVALID SQL GIBBERISH");
let result = stream.next().await;
assert!(result.is_some_and(|r| r.is_err()));
drop(stream);
let spans = tel.spans();
let last = spans.last().unwrap();
assert_annotated_span(last);
assert_error_span(last);
let mut stream = pool
.with_operation("SELECT", "users")
.execute_many("INVALID SQL GIBBERISH");
let result = stream.next().await;
assert!(result.is_some_and(|r| r.is_err()));
drop(stream);
let last = tel.spans().last().unwrap().clone();
assert_annotated_span(&last);
assert_error_span(&last);
}
#[tokio::test]
#[serial]
async fn fetch_via_pool() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut stream = (&pool).fetch("SELECT 1 UNION ALL SELECT 2");
let mut count = 0u64;
while stream.next().await.is_some() {
count += 1;
}
assert_eq!(count, 2);
drop(stream);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert_eq!(
attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(2))
);
let mut stream = pool
.with_annotations(test_annotations())
.fetch("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
assert_annotated_span(tel.spans().last().unwrap());
let mut stream = pool
.with_operation("SELECT", "users")
.fetch("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn fetch_via_connection() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut conn = pool.acquire().await.unwrap();
let mut stream = (&mut conn).fetch("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert_eq!(
attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(2))
);
let mut stream = conn
.with_annotations(test_annotations())
.fetch("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
assert_annotated_span(tel.spans().last().unwrap());
let mut stream = conn
.with_operation("SELECT", "users")
.fetch("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn fetch_via_transaction() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut tx: Transaction<'_, Postgres> = pool.begin().await.unwrap();
let mut stream = (&mut tx).fetch("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
let mut stream = tx
.with_annotations(test_annotations())
.fetch("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
let mut stream = tx
.with_operation("SELECT", "users")
.fetch("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
tx.commit().await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 3);
assert_common_span_attributes(&spans[0], SYSTEM);
assert_eq!(
attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(2))
);
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn fetch_stream_dropped_early_still_records_span() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
{
let mut stream = (&pool).fetch("SELECT 1 UNION ALL SELECT 2");
let _ = stream.next().await;
}
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert_eq!(
attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(1))
);
}
#[tokio::test]
#[serial]
async fn fetch_stream_records_error() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut stream = (&pool).fetch("INVALID SQL");
let result = stream.next().await;
assert!(result.is_some_and(|r| r.is_err()));
drop(stream);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert_eq!(
attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(0))
);
assert_error_span(&spans[0]);
let mut stream = pool
.with_annotations(test_annotations())
.fetch("INVALID SQL");
let result = stream.next().await;
assert!(result.is_some_and(|r| r.is_err()));
drop(stream);
let spans = tel.spans();
let last = spans.last().unwrap();
assert_annotated_span(last);
assert_error_span(last);
let mut stream = pool.with_operation("SELECT", "users").fetch("INVALID SQL");
let result = stream.next().await;
assert!(result.is_some_and(|r| r.is_err()));
drop(stream);
let last = tel.spans().last().unwrap().clone();
assert_annotated_span(&last);
assert_error_span(&last);
}
#[tokio::test]
#[serial]
async fn fetch_many_via_pool() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut stream = (&pool).fetch_many("SELECT 1 UNION ALL SELECT 2");
let mut rows = 0u64;
while let Some(item) = stream.next().await {
if let Ok(sqlx::Either::Right(_)) = item {
rows += 1;
}
}
drop(stream);
assert_eq!(rows, 2);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert_eq!(
attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(2))
);
let mut stream = pool
.with_annotations(test_annotations())
.fetch_many("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
assert_annotated_span(tel.spans().last().unwrap());
let mut stream = pool
.with_operation("SELECT", "users")
.fetch_many("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn fetch_many_via_connection() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut conn = pool.acquire().await.unwrap();
let mut stream = (&mut conn).fetch_many("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert_eq!(
attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(2))
);
let mut stream = conn
.with_annotations(test_annotations())
.fetch_many("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
assert_annotated_span(tel.spans().last().unwrap());
let mut stream = conn
.with_operation("SELECT", "users")
.fetch_many("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn fetch_many_via_transaction() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut tx: Transaction<'_, Postgres> = pool.begin().await.unwrap();
let mut stream = (&mut tx).fetch_many("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
let mut stream = tx
.with_annotations(test_annotations())
.fetch_many("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
let mut stream = tx
.with_operation("SELECT", "users")
.fetch_many("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
tx.commit().await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 3);
assert_common_span_attributes(&spans[0], SYSTEM);
assert_eq!(
attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(2))
);
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn fetch_many_dropped_early_still_records_span() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
{
let mut stream = (&pool).fetch_many("SELECT 1 UNION ALL SELECT 2");
let _ = stream.next().await;
}
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert_eq!(
attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(1))
);
}
#[tokio::test]
#[serial]
async fn fetch_many_records_error() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut stream = (&pool).fetch_many("INVALID SQL GIBBERISH");
let result = stream.next().await;
assert!(result.is_some_and(|r| r.is_err()));
drop(stream);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert_eq!(
attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(0))
);
assert_error_span(&spans[0]);
let mut stream = pool
.with_annotations(test_annotations())
.fetch_many("INVALID SQL GIBBERISH");
let result = stream.next().await;
assert!(result.is_some_and(|r| r.is_err()));
drop(stream);
let spans = tel.spans();
let last = spans.last().unwrap();
assert_annotated_span(last);
assert_error_span(last);
let mut stream = pool
.with_operation("SELECT", "users")
.fetch_many("INVALID SQL GIBBERISH");
let result = stream.next().await;
assert!(result.is_some_and(|r| r.is_err()));
drop(stream);
let last = tel.spans().last().unwrap().clone();
assert_annotated_span(&last);
assert_error_span(&last);
}
#[tokio::test]
#[serial]
async fn fetch_all_via_pool() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let rows = (&pool)
.fetch_all("SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3")
.await
.unwrap();
assert_eq!(rows.len(), 3);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert_eq!(
attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(3))
);
pool.with_annotations(test_annotations())
.fetch_all("SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3")
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
pool.with_operation("SELECT", "users")
.fetch_all("SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3")
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn fetch_all_via_connection() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut conn = pool.acquire().await.unwrap();
let rows = (&mut conn)
.fetch_all("SELECT 1 UNION ALL SELECT 2")
.await
.unwrap();
assert_eq!(rows.len(), 2);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert_eq!(
attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(2))
);
conn.with_annotations(test_annotations())
.fetch_all("SELECT 1 UNION ALL SELECT 2")
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
conn.with_operation("SELECT", "users")
.fetch_all("SELECT 1 UNION ALL SELECT 2")
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn fetch_all_via_transaction() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut tx: Transaction<'_, Postgres> = pool.begin().await.unwrap();
let rows = (&mut tx)
.fetch_all("SELECT 1 UNION ALL SELECT 2")
.await
.unwrap();
assert_eq!(rows.len(), 2);
tx.with_annotations(test_annotations())
.fetch_all("SELECT 1 UNION ALL SELECT 2")
.await
.unwrap();
tx.with_operation("SELECT", "users")
.fetch_all("SELECT 1 UNION ALL SELECT 2")
.await
.unwrap();
tx.commit().await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 3);
assert_common_span_attributes(&spans[0], SYSTEM);
assert_eq!(
attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(2))
);
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn fetch_all_records_error() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let result = (&pool).fetch_all("INVALID SQL GIBBERISH").await;
assert!(result.is_err());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert!(attr(&spans[0], "db.response.returned_rows").is_none());
assert_error_span(&spans[0]);
let result = pool
.with_annotations(test_annotations())
.fetch_all("INVALID SQL GIBBERISH")
.await;
assert!(result.is_err());
let spans = tel.spans();
let last = spans.last().unwrap();
assert_annotated_span(last);
assert_error_span(last);
let result = pool
.with_operation("SELECT", "users")
.fetch_all("INVALID SQL GIBBERISH")
.await;
assert!(result.is_err());
let last = tel.spans().last().unwrap().clone();
assert_annotated_span(&last);
assert_error_span(&last);
}
#[tokio::test]
#[serial]
async fn fetch_one_via_pool() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let _row = (&pool).fetch_one("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert_eq!(
attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(1))
);
pool.with_annotations(test_annotations())
.fetch_one("SELECT 1")
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
pool.with_operation("SELECT", "users")
.fetch_one("SELECT 1")
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn fetch_one_via_connection() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut conn = pool.acquire().await.unwrap();
let _row = (&mut conn).fetch_one("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert_eq!(
attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(1))
);
conn.with_annotations(test_annotations())
.fetch_one("SELECT 1")
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
conn.with_operation("SELECT", "users")
.fetch_one("SELECT 1")
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn fetch_one_via_transaction() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut tx: Transaction<'_, Postgres> = pool.begin().await.unwrap();
let _row = (&mut tx).fetch_one("SELECT 1").await.unwrap();
tx.with_annotations(test_annotations())
.fetch_one("SELECT 1")
.await
.unwrap();
tx.with_operation("SELECT", "users")
.fetch_one("SELECT 1")
.await
.unwrap();
tx.commit().await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 3);
assert_common_span_attributes(&spans[0], SYSTEM);
assert_eq!(
attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(1))
);
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn fetch_one_records_error() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let result = (&pool).fetch_one("INVALID SQL GIBBERISH").await;
assert!(result.is_err());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert!(attr(&spans[0], "db.response.returned_rows").is_none());
assert_error_span(&spans[0]);
let result = pool
.with_annotations(test_annotations())
.fetch_one("INVALID SQL GIBBERISH")
.await;
assert!(result.is_err());
let spans = tel.spans();
let last = spans.last().unwrap();
assert_annotated_span(last);
assert_error_span(last);
let result = pool
.with_operation("SELECT", "users")
.fetch_one("INVALID SQL GIBBERISH")
.await;
assert!(result.is_err());
let last = tel.spans().last().unwrap().clone();
assert_annotated_span(&last);
assert_error_span(&last);
}
#[tokio::test]
#[serial]
async fn fetch_optional_records_one_row() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let result = (&pool).fetch_optional("SELECT 1").await.unwrap();
assert!(result.is_some());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert_eq!(
attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(1))
);
pool.with_annotations(test_annotations())
.fetch_optional("SELECT 1")
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
pool.with_operation("SELECT", "users")
.fetch_optional("SELECT 1")
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn fetch_optional_records_zero_rows() {
let _setup_tel = common::TestTelemetry::install();
let pool = test_pool().await;
sqlx::query("CREATE TABLE IF NOT EXISTS empty_table (id SERIAL PRIMARY KEY)")
.execute(&pool)
.await
.unwrap();
sqlx::query("DELETE FROM empty_table")
.execute(&pool)
.await
.unwrap();
let tel = common::TestTelemetry::install();
let result = (&pool)
.fetch_optional("SELECT id FROM empty_table")
.await
.unwrap();
assert!(result.is_none());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert_eq!(
attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(0))
);
}
#[tokio::test]
#[serial]
async fn fetch_optional_via_connection() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut conn = pool.acquire().await.unwrap();
let result = (&mut conn).fetch_optional("SELECT 42").await.unwrap();
assert!(result.is_some());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert_eq!(
attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(1))
);
conn.with_annotations(test_annotations())
.fetch_optional("SELECT 42")
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
conn.with_operation("SELECT", "users")
.fetch_optional("SELECT 42")
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn fetch_optional_via_transaction() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut tx: Transaction<'_, Postgres> = pool.begin().await.unwrap();
let result = (&mut tx).fetch_optional("SELECT 99").await.unwrap();
assert!(result.is_some());
tx.with_annotations(test_annotations())
.fetch_optional("SELECT 99")
.await
.unwrap();
tx.with_operation("SELECT", "users")
.fetch_optional("SELECT 99")
.await
.unwrap();
tx.commit().await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 3);
assert_common_span_attributes(&spans[0], SYSTEM);
assert_eq!(
attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(1))
);
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn fetch_optional_records_error() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let result = (&pool).fetch_optional("INVALID SQL GIBBERISH").await;
assert!(result.is_err());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert!(attr(&spans[0], "db.response.returned_rows").is_none());
assert_error_span(&spans[0]);
let result = pool
.with_annotations(test_annotations())
.fetch_optional("INVALID SQL GIBBERISH")
.await;
assert!(result.is_err());
let spans = tel.spans();
let last = spans.last().unwrap();
assert_annotated_span(last);
assert_error_span(last);
let result = pool
.with_operation("SELECT", "users")
.fetch_optional("INVALID SQL GIBBERISH")
.await;
assert!(result.is_err());
let last = tel.spans().last().unwrap().clone();
assert_annotated_span(&last);
assert_error_span(&last);
}
#[tokio::test]
#[serial]
async fn prepare_via_pool() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let _stmt = (&pool).prepare("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert!(attr(&spans[0], "db.response.returned_rows").is_none());
pool.with_annotations(test_annotations())
.prepare("SELECT 1")
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
pool.with_operation("SELECT", "users")
.prepare("SELECT 1")
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn prepare_via_connection() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut conn = pool.acquire().await.unwrap();
let _stmt = (&mut conn).prepare("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert!(attr(&spans[0], "db.response.returned_rows").is_none());
conn.with_annotations(test_annotations())
.prepare("SELECT 1")
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
conn.with_operation("SELECT", "users")
.prepare("SELECT 1")
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn prepare_via_transaction() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut tx: Transaction<'_, Postgres> = pool.begin().await.unwrap();
let _stmt = (&mut tx).prepare("SELECT 1").await.unwrap();
tx.with_annotations(test_annotations())
.prepare("SELECT 1")
.await
.unwrap();
tx.with_operation("SELECT", "users")
.prepare("SELECT 1")
.await
.unwrap();
tx.commit().await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 3);
assert_common_span_attributes(&spans[0], SYSTEM);
assert!(attr(&spans[0], "db.response.returned_rows").is_none());
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn prepare_records_error() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut conn = pool.acquire().await.unwrap();
let result = (&mut conn).prepare("INVALID SQL GIBBERISH").await;
assert!(result.is_err());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert!(attr(&spans[0], "db.response.returned_rows").is_none());
assert_error_span(&spans[0]);
let result = conn
.with_annotations(test_annotations())
.prepare("INVALID SQL GIBBERISH")
.await;
assert!(result.is_err());
let spans = tel.spans();
let last = spans.last().unwrap();
assert_annotated_span(last);
assert_error_span(last);
let result = conn
.with_operation("SELECT", "users")
.prepare("INVALID SQL GIBBERISH")
.await;
assert!(result.is_err());
let last = tel.spans().last().unwrap().clone();
assert_annotated_span(&last);
assert_error_span(&last);
}
#[tokio::test]
#[serial]
async fn prepare_with_via_pool() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let _stmt = (&pool).prepare_with("SELECT $1", &[]).await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert!(attr(&spans[0], "db.response.returned_rows").is_none());
pool.with_annotations(test_annotations())
.prepare_with("SELECT $1", &[])
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
pool.with_operation("SELECT", "users")
.prepare_with("SELECT $1", &[])
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn prepare_with_via_connection() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut conn = pool.acquire().await.unwrap();
let _stmt = (&mut conn).prepare_with("SELECT $1", &[]).await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert!(attr(&spans[0], "db.response.returned_rows").is_none());
conn.with_annotations(test_annotations())
.prepare_with("SELECT $1", &[])
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
conn.with_operation("SELECT", "users")
.prepare_with("SELECT $1", &[])
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn prepare_with_via_transaction() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut tx: Transaction<'_, Postgres> = pool.begin().await.unwrap();
let _stmt = (&mut tx).prepare_with("SELECT $1", &[]).await.unwrap();
tx.with_annotations(test_annotations())
.prepare_with("SELECT $1", &[])
.await
.unwrap();
tx.with_operation("SELECT", "users")
.prepare_with("SELECT $1", &[])
.await
.unwrap();
tx.commit().await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 3);
assert_common_span_attributes(&spans[0], SYSTEM);
assert!(attr(&spans[0], "db.response.returned_rows").is_none());
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn prepare_with_records_error() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut conn = pool.acquire().await.unwrap();
let result = (&mut conn).prepare_with("INVALID SQL GIBBERISH", &[]).await;
assert!(result.is_err());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert!(attr(&spans[0], "db.response.returned_rows").is_none());
assert_error_span(&spans[0]);
let result = conn
.with_annotations(test_annotations())
.prepare_with("INVALID SQL GIBBERISH", &[])
.await;
assert!(result.is_err());
let spans = tel.spans();
let last = spans.last().unwrap();
assert_annotated_span(last);
assert_error_span(last);
let result = conn
.with_operation("SELECT", "users")
.prepare_with("INVALID SQL GIBBERISH", &[])
.await;
assert!(result.is_err());
let last = tel.spans().last().unwrap().clone();
assert_annotated_span(&last);
assert_error_span(&last);
}
#[tokio::test]
#[serial]
async fn describe_via_pool() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let _desc = (&pool).describe("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert!(attr(&spans[0], "db.response.returned_rows").is_none());
pool.with_annotations(test_annotations())
.describe("SELECT 1")
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
pool.with_operation("SELECT", "users")
.describe("SELECT 1")
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn describe_via_connection() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut conn = pool.acquire().await.unwrap();
let _desc = (&mut conn).describe("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert!(attr(&spans[0], "db.response.returned_rows").is_none());
conn.with_annotations(test_annotations())
.describe("SELECT 1")
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
conn.with_operation("SELECT", "users")
.describe("SELECT 1")
.await
.unwrap();
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn describe_via_transaction() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut tx: Transaction<'_, Postgres> = pool.begin().await.unwrap();
let _desc = (&mut tx).describe("SELECT 1").await.unwrap();
tx.with_annotations(test_annotations())
.describe("SELECT 1")
.await
.unwrap();
tx.with_operation("SELECT", "users")
.describe("SELECT 1")
.await
.unwrap();
tx.commit().await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 3);
assert_common_span_attributes(&spans[0], SYSTEM);
assert!(attr(&spans[0], "db.response.returned_rows").is_none());
assert_annotated_span(tel.spans().last().unwrap());
}
#[tokio::test]
#[serial]
async fn describe_records_error() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut conn = pool.acquire().await.unwrap();
let result = (&mut conn).describe("INVALID SQL GIBBERISH").await;
assert!(result.is_err());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
assert!(attr(&spans[0], "db.response.returned_rows").is_none());
assert_error_span(&spans[0]);
let result = conn
.with_annotations(test_annotations())
.describe("INVALID SQL GIBBERISH")
.await;
assert!(result.is_err());
let spans = tel.spans();
let last = spans.last().unwrap();
assert_annotated_span(last);
assert_error_span(last);
let result = conn
.with_operation("SELECT", "users")
.describe("INVALID SQL GIBBERISH")
.await;
assert!(result.is_err());
let last = tel.spans().last().unwrap().clone();
assert_annotated_span(&last);
assert_error_span(&last);
}
#[tokio::test]
#[serial]
async fn connection_attributes_populated() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let _row = (&pool).fetch_one("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
attr(&spans[0], "server.address"),
Some(opentelemetry::Value::String("localhost".into()))
);
assert!(
attr(&spans[0], "server.port").is_some(),
"server.port missing"
);
assert_eq!(
attr(&spans[0], "db.namespace"),
Some(opentelemetry::Value::String("testdb".into()))
);
}
#[tokio::test]
#[serial]
async fn sqlstate_recorded_on_constraint_violation() {
let _setup_tel = common::TestTelemetry::install();
let pool = test_pool().await;
sqlx::query("CREATE TABLE IF NOT EXISTS unique_test (id INT PRIMARY KEY)")
.execute(&pool)
.await
.unwrap();
sqlx::query("DELETE FROM unique_test")
.execute(&pool)
.await
.unwrap();
sqlx::query("INSERT INTO unique_test (id) VALUES (1)")
.execute(&pool)
.await
.unwrap();
let tel = common::TestTelemetry::install();
let result = sqlx::query("INSERT INTO unique_test (id) VALUES (1)")
.execute(&pool)
.await;
assert!(result.is_err());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_error_span(&spans[0]);
assert_eq!(
attr(&spans[0], "db.response.status_code"),
Some(opentelemetry::Value::String("23505".into()))
);
}
#[tokio::test]
#[serial]
async fn operation_duration_metric_is_recorded() {
use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData};
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let _row = (&pool).fetch_one("SELECT 1").await.unwrap();
let resource_metrics = tel.metrics();
assert!(!resource_metrics.is_empty(), "should have metric data");
let mut found_duration = false;
for rm in &resource_metrics {
for sm in rm.scope_metrics() {
for metric in sm.metrics() {
if metric.name() == "db.client.operation.duration" {
found_duration = true;
assert_eq!(metric.unit(), "s");
if let AggregatedMetrics::F64(MetricData::Histogram(hist)) = metric.data() {
let dp: Vec<_> = hist.data_points().collect();
assert!(!dp.is_empty(), "histogram should have data points");
assert!(dp[0].count() > 0, "data point count should be > 0");
let has_system = dp[0]
.attributes()
.any(|kv| kv.key.as_str() == "db.system.name");
assert!(has_system, "metric should have db.system.name attribute");
} else {
panic!("db.client.operation.duration should be an f64 histogram");
}
}
}
}
}
assert!(
found_duration,
"db.client.operation.duration metric not found"
);
}
#[tokio::test]
#[serial]
async fn transaction_rollback() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
let mut tx: Transaction<'_, Postgres> = pool.begin().await.unwrap();
sqlx::query("CREATE TABLE IF NOT EXISTS rollback_test (id SERIAL PRIMARY KEY)")
.execute(&mut tx)
.await
.unwrap();
tx.rollback().await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_common_span_attributes(&spans[0], SYSTEM);
}
#[tokio::test]
#[serial]
async fn builder_with_database_overrides_namespace() {
let tel = common::TestTelemetry::install();
let shared = shared_container().await;
let raw = sqlx::PgPool::connect(&shared.url).await.unwrap();
let pool = PoolBuilder::from(raw).with_database("custom_db").build();
let _ = (&pool).fetch_optional("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
attr(&spans[0], "db.namespace"),
Some(opentelemetry::Value::String("custom_db".into()))
);
}
#[tokio::test]
#[serial]
async fn builder_with_host_overrides_server_address() {
let tel = common::TestTelemetry::install();
let shared = shared_container().await;
let raw = sqlx::PgPool::connect(&shared.url).await.unwrap();
let pool = PoolBuilder::from(raw).with_host("custom-host").build();
let _ = (&pool).fetch_optional("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
attr(&spans[0], "server.address"),
Some(opentelemetry::Value::String("custom-host".into()))
);
}
#[tokio::test]
#[serial]
async fn builder_with_port_overrides_server_port() {
let tel = common::TestTelemetry::install();
let shared = shared_container().await;
let raw = sqlx::PgPool::connect(&shared.url).await.unwrap();
let pool = PoolBuilder::from(raw).with_port(9999).build();
let _ = (&pool).fetch_optional("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
attr(&spans[0], "server.port"),
Some(opentelemetry::Value::I64(9999))
);
}
#[tokio::test]
#[serial]
async fn builder_with_network_peer_address() {
let tel = common::TestTelemetry::install();
let shared = shared_container().await;
let raw = sqlx::PgPool::connect(&shared.url).await.unwrap();
let pool = PoolBuilder::from(raw)
.with_network_peer_address("10.0.0.5")
.build();
let _ = (&pool).fetch_optional("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
attr(&spans[0], "network.peer.address"),
Some(opentelemetry::Value::String("10.0.0.5".into()))
);
}
#[tokio::test]
#[serial]
async fn builder_with_network_peer_port() {
let tel = common::TestTelemetry::install();
let shared = shared_container().await;
let raw = sqlx::PgPool::connect(&shared.url).await.unwrap();
let pool = PoolBuilder::from(raw).with_network_peer_port(5433).build();
let _ = (&pool).fetch_optional("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
attr(&spans[0], "network.peer.port"),
Some(opentelemetry::Value::I64(5433))
);
}
#[tokio::test]
#[serial]
async fn pool_close_and_is_closed() {
let _tel = common::TestTelemetry::install();
let pool = test_pool().await;
assert!(!pool.is_closed());
pool.close().await;
assert!(pool.is_closed());
}
#[tokio::test]
#[serial]
async fn query_text_mode_off_suppresses_sql() {
let shared = shared_container().await;
let raw = sqlx::PgPool::connect(&shared.url).await.unwrap();
let pool = PoolBuilder::from(raw)
.with_query_text_mode(sqlx_otel::QueryTextMode::Off)
.build();
let tel = common::TestTelemetry::install();
let _row = (&pool).fetch_optional("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(spans[0].span_kind, SpanKind::Client);
assert_eq!(
attr(&spans[0], "db.system.name"),
Some(opentelemetry::Value::String(SYSTEM.to_owned().into()))
);
assert!(attr(&spans[0], "db.namespace").is_some());
assert!(
attr(&spans[0], "db.query.text").is_none(),
"db.query.text should not be present when QueryTextMode::Off"
);
}
#[tokio::test]
#[serial]
async fn query_text_mode_obfuscated_replaces_literals() {
let shared = shared_container().await;
let raw = sqlx::PgPool::connect(&shared.url).await.unwrap();
let pool = PoolBuilder::from(raw)
.with_query_text_mode(sqlx_otel::QueryTextMode::Obfuscated)
.build();
let tel = common::TestTelemetry::install();
let _row = (&pool)
.fetch_optional("SELECT 1, 'alice', 3.14")
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
attr(&spans[0], "db.query.text"),
Some(opentelemetry::Value::String("SELECT ?, ?, ?".into()))
);
}
#[tokio::test]
#[serial]
async fn annotation_all_four_fields() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
pool.with_annotations(
QueryAnnotations::new()
.operation("SELECT")
.collection("users")
.query_summary("users by id")
.stored_procedure("sp_get_users"),
)
.fetch_all("SELECT 1")
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(spans[0].name, "users by id");
assert_eq!(
attr(&spans[0], "db.operation.name"),
Some(opentelemetry::Value::String("SELECT".into())),
);
assert_eq!(
attr(&spans[0], "db.collection.name"),
Some(opentelemetry::Value::String("users".into())),
);
assert_eq!(
attr(&spans[0], "db.query.summary"),
Some(opentelemetry::Value::String("users by id".into())),
);
assert_eq!(
attr(&spans[0], "db.stored_procedure.name"),
Some(opentelemetry::Value::String("sp_get_users".into())),
);
}
#[tokio::test]
#[serial]
async fn query_summary_drives_span_name() {
let tel = common::TestTelemetry::install();
let pool = test_pool().await;
pool.with_annotations(
QueryAnnotations::new()
.operation("SELECT")
.collection("users")
.query_summary("users by tenant"),
)
.fetch_all("SELECT 1")
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(spans[0].name, "users by tenant");
assert_eq!(
attr(&spans[0], "db.query.summary"),
Some(opentelemetry::Value::String("users by tenant".into())),
);
assert_eq!(
attr(&spans[0], "db.operation.name"),
Some(opentelemetry::Value::String("SELECT".into())),
);
assert_eq!(
attr(&spans[0], "db.collection.name"),
Some(opentelemetry::Value::String("users".into())),
);
}