#![allow(dead_code, clippy::must_use_candidate, clippy::missing_panics_doc)]
use opentelemetry::trace::{SpanKind, Status};
use opentelemetry_sdk::metrics::{InMemoryMetricExporter, PeriodicReader, SdkMeterProvider};
use opentelemetry_sdk::trace::{InMemorySpanExporter, SdkTracerProvider, SpanData};
use sqlx_otel::QueryAnnotations;
pub struct TestTelemetry {
span_exporter: InMemorySpanExporter,
metric_exporter: InMemoryMetricExporter,
tracer_provider: SdkTracerProvider,
meter_provider: SdkMeterProvider,
}
impl TestTelemetry {
#[must_use]
pub fn install() -> Self {
let span_exporter = InMemorySpanExporter::default();
let tracer_provider = SdkTracerProvider::builder()
.with_simple_exporter(span_exporter.clone())
.build();
opentelemetry::global::set_tracer_provider(tracer_provider.clone());
let metric_exporter = InMemoryMetricExporter::default();
let reader = PeriodicReader::builder(metric_exporter.clone()).build();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
opentelemetry::global::set_meter_provider(meter_provider.clone());
Self {
span_exporter,
metric_exporter,
tracer_provider,
meter_provider,
}
}
#[must_use]
pub fn spans(&self) -> Vec<SpanData> {
let _ = self.tracer_provider.force_flush();
self.span_exporter.get_finished_spans().unwrap_or_default()
}
#[must_use]
pub fn metrics(&self) -> Vec<opentelemetry_sdk::metrics::data::ResourceMetrics> {
let _ = self.meter_provider.force_flush();
self.metric_exporter
.get_finished_metrics()
.unwrap_or_default()
}
pub fn reset(&self) {
let _ = self.tracer_provider.force_flush();
let _ = self.meter_provider.force_flush();
self.span_exporter.reset();
self.metric_exporter.reset();
}
}
impl Drop for TestTelemetry {
fn drop(&mut self) {
let _ = self.tracer_provider.shutdown();
let _ = self.meter_provider.shutdown();
}
}
pub fn attr(span: &SpanData, key: &str) -> Option<opentelemetry::Value> {
span.attributes
.iter()
.find(|kv| kv.key.as_str() == key)
.map(|kv| kv.value.clone())
}
pub fn metric_attr(
dp: &opentelemetry_sdk::metrics::data::HistogramDataPoint<f64>,
key: &str,
) -> Option<opentelemetry::Value> {
dp.attributes()
.find(|kv| kv.key.as_str() == key)
.map(|kv| kv.value.clone())
}
pub fn find_histogram_data_point_with(
metrics: &[opentelemetry_sdk::metrics::data::ResourceMetrics],
metric_name: &str,
expected: &[(&str, &str)],
) -> Option<opentelemetry_sdk::metrics::data::HistogramDataPoint<f64>> {
use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData};
for rm in metrics {
for sm in rm.scope_metrics() {
for metric in sm.metrics() {
if metric.name() != metric_name {
continue;
}
if let AggregatedMetrics::F64(MetricData::Histogram(hist)) = metric.data() {
for dp in hist.data_points() {
let matches = expected.iter().all(|(k, v)| {
dp.attributes().any(|kv| {
kv.key.as_str() == *k
&& matches!(
&kv.value,
opentelemetry::Value::String(s) if s.as_str() == *v
)
})
});
if matches {
return Some(dp.clone());
}
}
}
}
}
}
None
}
pub fn assert_affected_rows_metric(tel: &TestTelemetry, system: &str) {
let metrics = tel.metrics();
let dp = find_histogram_data_point_with(
&metrics,
"db.client.response.affected_rows",
&[("db.system.name", system)],
)
.unwrap_or_else(|| {
panic!("no db.client.response.affected_rows data point found for system {system:?}")
});
assert!(
dp.count() > 0,
"db.client.response.affected_rows data point has zero count",
);
}
pub fn find_duration_data_point(
metrics: &[opentelemetry_sdk::metrics::data::ResourceMetrics],
) -> Option<opentelemetry_sdk::metrics::data::HistogramDataPoint<f64>> {
use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData};
for rm in metrics {
for sm in rm.scope_metrics() {
for metric in sm.metrics() {
if metric.name() != "db.client.operation.duration" {
continue;
}
if let AggregatedMetrics::F64(MetricData::Histogram(hist)) = metric.data() {
if let Some(dp) = hist.data_points().next() {
return Some(dp.clone());
}
}
}
}
}
None
}
pub fn find_duration_data_point_with(
metrics: &[opentelemetry_sdk::metrics::data::ResourceMetrics],
expected: &[(&str, &str)],
) -> Option<opentelemetry_sdk::metrics::data::HistogramDataPoint<f64>> {
use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData};
for rm in metrics {
for sm in rm.scope_metrics() {
for metric in sm.metrics() {
if metric.name() != "db.client.operation.duration" {
continue;
}
if let AggregatedMetrics::F64(MetricData::Histogram(hist)) = metric.data() {
for dp in hist.data_points() {
let matches = expected.iter().all(|(k, v)| {
dp.attributes().any(|kv| {
kv.key.as_str() == *k
&& matches!(
&kv.value,
opentelemetry::Value::String(s) if s.as_str() == *v
)
})
});
if matches {
return Some(dp.clone());
}
}
}
}
}
}
None
}
pub fn assert_metric_data_point(tel: &TestTelemetry, expected: &[(&str, &str)]) {
let metrics = tel.metrics();
let dp = find_duration_data_point_with(&metrics, expected).unwrap_or_else(|| {
panic!(
"no db.client.operation.duration data point found matching expected attrs {expected:?}",
)
});
assert!(
dp.count() > 0,
"matching data point has zero count for expected attrs {expected:?}",
);
}
pub fn assert_metric_for_system(tel: &TestTelemetry, system: &str) {
assert_metric_data_point(tel, &[("db.system.name", system)]);
}
pub fn assert_annotated_metric(tel: &TestTelemetry, dialect: &Dialect) {
assert_metric_data_point(
tel,
&[
("db.system.name", dialect.system),
("db.operation.name", "SELECT"),
("db.collection.name", "users"),
],
);
}
pub fn assert_annotated_error_metric(tel: &TestTelemetry, dialect: &Dialect) {
use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData};
let metrics = tel.metrics();
let dp = metrics
.iter()
.flat_map(opentelemetry_sdk::metrics::data::ResourceMetrics::scope_metrics)
.flat_map(opentelemetry_sdk::metrics::data::ScopeMetrics::metrics)
.filter(|m| m.name() == "db.client.operation.duration")
.filter_map(|m| {
if let AggregatedMetrics::F64(MetricData::Histogram(hist)) = m.data() {
Some(hist)
} else {
None
}
})
.flat_map(opentelemetry_sdk::metrics::data::Histogram::data_points)
.find(|dp| {
let attr_eq = |key: &str, value: &str| {
dp.attributes().any(|kv| {
kv.key.as_str() == key
&& matches!(
&kv.value,
opentelemetry::Value::String(s) if s.as_str() == value
)
})
};
let attr_present = |key: &str| {
dp.attributes().any(|kv| {
kv.key.as_str() == key
&& matches!(
&kv.value,
opentelemetry::Value::String(s) if !s.as_str().is_empty()
)
})
};
attr_eq("db.system.name", dialect.system)
&& attr_eq("db.operation.name", "SELECT")
&& attr_eq("db.collection.name", "users")
&& attr_present("error.type")
});
assert!(
dp.is_some(),
"no annotated db.client.operation.duration data point with error.type and \
db.system.name = {:?} found",
dialect.system,
);
}
pub fn assert_error_metric(tel: &TestTelemetry, dialect: &Dialect) {
use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData};
let metrics = tel.metrics();
let dp = metrics
.iter()
.flat_map(opentelemetry_sdk::metrics::data::ResourceMetrics::scope_metrics)
.flat_map(opentelemetry_sdk::metrics::data::ScopeMetrics::metrics)
.filter(|m| m.name() == "db.client.operation.duration")
.filter_map(|m| {
if let AggregatedMetrics::F64(MetricData::Histogram(hist)) = m.data() {
Some(hist)
} else {
None
}
})
.flat_map(opentelemetry_sdk::metrics::data::Histogram::data_points)
.find(|dp| {
let has_error_type = dp.attributes().any(|kv| {
kv.key.as_str() == "error.type"
&& matches!(&kv.value, opentelemetry::Value::String(s) if !s.as_str().is_empty())
});
let has_system = dp.attributes().any(|kv| {
kv.key.as_str() == "db.system.name"
&& matches!(
&kv.value,
opentelemetry::Value::String(s) if s.as_str() == dialect.system
)
});
has_error_type && has_system
});
assert!(
dp.is_some(),
"no db.client.operation.duration data point with error.type and db.system.name = {:?} found",
dialect.system,
);
}
pub fn assert_common_span_attributes(span: &SpanData, system: &str) {
assert_eq!(span.span_kind, SpanKind::Client);
assert_eq!(
span.name, system,
"span name should fall back to db.system.name"
);
assert_eq!(
attr(span, "db.system.name"),
Some(opentelemetry::Value::String(system.to_owned().into())),
"db.system.name missing or wrong"
);
let namespace = attr(span, "db.namespace");
assert!(
matches!(&namespace, Some(opentelemetry::Value::String(s)) if !s.as_str().is_empty()),
"db.namespace should be a non-empty string, got {namespace:?}",
);
let query_text = attr(span, "db.query.text");
assert!(
matches!(&query_text, Some(opentelemetry::Value::String(s)) if !s.as_str().is_empty()),
"db.query.text should be a non-empty string, got {query_text:?}",
);
}
#[derive(Debug, sqlx::FromRow)]
pub struct MacroUser<Id> {
pub id: Id,
pub name: String,
}
pub fn assert_error_span(span: &SpanData) {
assert!(
matches!(&span.status, Status::Error { .. }),
"span status should be Error, got {:?}",
span.status
);
let error_type = attr(span, "error.type");
assert!(
matches!(&error_type, Some(opentelemetry::Value::String(s)) if !s.as_str().is_empty()),
"error.type should be a non-empty string, got {error_type:?}",
);
let event = span
.events
.iter()
.find(|e| e.name == "exception")
.expect("exception event missing");
let exception_type = event
.attributes
.iter()
.find(|kv| kv.key.as_str() == "exception.type")
.map(|kv| kv.value.clone());
assert!(
matches!(&exception_type, Some(opentelemetry::Value::String(s)) if !s.as_str().is_empty()),
"exception.type should be a non-empty string, got {exception_type:?}",
);
let exception_message = event
.attributes
.iter()
.find(|kv| kv.key.as_str() == "exception.message")
.map(|kv| kv.value.clone());
assert!(
matches!(&exception_message, Some(opentelemetry::Value::String(s)) if !s.as_str().is_empty()),
"exception.message should be a non-empty string, got {exception_message:?}",
);
}
pub struct Dialect {
pub system: &'static str,
pub id_pk_column: &'static str,
pub text_column: &'static str,
pub upsert_sql: &'static str,
pub upsert_affected_rows: i64,
pub string_concat_update_sql: &'static str,
pub bind_two_sum_sql: &'static str,
pub prepare_with_select_sql: &'static str,
}
pub const SQLITE_DIALECT: Dialect = Dialect {
system: "sqlite",
id_pk_column: "INTEGER PRIMARY KEY",
text_column: "TEXT NOT NULL",
upsert_sql: "INSERT OR REPLACE INTO affected_test (id, name) VALUES (1, 'alice_updated')",
upsert_affected_rows: 1,
string_concat_update_sql: "UPDATE affected_test SET name = name || '_updated' WHERE id IN (2, 3)",
bind_two_sum_sql: "SELECT ?1 + ?2 AS sum",
prepare_with_select_sql: "SELECT ?",
};
pub const POSTGRES_DIALECT: Dialect = Dialect {
system: "postgresql",
id_pk_column: "INT PRIMARY KEY",
text_column: "TEXT NOT NULL",
upsert_sql: "INSERT INTO affected_test (id, name) VALUES (1, 'alice_updated') \
ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name",
upsert_affected_rows: 1,
string_concat_update_sql: "UPDATE affected_test SET name = name || '_updated' WHERE id IN (2, 3)",
bind_two_sum_sql: "SELECT ($1::bigint + $2::bigint) AS sum",
prepare_with_select_sql: "SELECT $1",
};
pub const MYSQL_DIALECT: Dialect = Dialect {
system: "mysql",
id_pk_column: "INT PRIMARY KEY",
text_column: "VARCHAR(255) NOT NULL",
upsert_sql: "INSERT INTO affected_test (id, name) VALUES (1, 'alice_updated') \
ON DUPLICATE KEY UPDATE name = VALUES(name)",
upsert_affected_rows: 2,
string_concat_update_sql: "UPDATE affected_test SET name = CONCAT(name, '_updated') WHERE id IN (2, 3)",
bind_two_sum_sql: "SELECT CAST(? + ? AS SIGNED) AS sum",
prepare_with_select_sql: "SELECT ?",
};
#[macro_export]
macro_rules! fresh_table {
($pool:expr, $table:expr, $columns:expr) => {{
use sqlx::Executor as _;
let drop_sql = format!("DROP TABLE IF EXISTS {}", $table);
$pool.execute(drop_sql.as_str()).await.unwrap();
let create_sql = format!("CREATE TABLE {} ({})", $table, $columns);
$pool.execute(create_sql.as_str()).await.unwrap();
}};
}
pub fn test_annotations() -> QueryAnnotations {
QueryAnnotations::new()
.operation("SELECT")
.collection("users")
}
pub fn assert_annotated_span(span: &SpanData, dialect: &Dialect) {
assert_eq!(span.span_kind, SpanKind::Client);
assert_eq!(span.name, "SELECT users");
assert_eq!(
attr(span, "db.system.name"),
Some(opentelemetry::Value::String(dialect.system.into())),
);
assert_eq!(
attr(span, "db.operation.name"),
Some(opentelemetry::Value::String("SELECT".into())),
);
assert_eq!(
attr(span, "db.collection.name"),
Some(opentelemetry::Value::String("users".into())),
);
}
pub fn assert_one_annotated_span(tel: &TestTelemetry, dialect: &Dialect) {
let spans = tel.spans();
assert_eq!(
spans.len(),
1,
"expected exactly one span, got {}",
spans.len()
);
assert_annotated_span(&spans[0], dialect);
}
pub fn assert_span_parity(case: &str, exec_spans: &[SpanData], query_spans: &[SpanData]) {
assert_eq!(
exec_spans.len(),
1,
"{case}: executor-side emitted {} spans, expected 1",
exec_spans.len()
);
assert_eq!(
query_spans.len(),
1,
"{case}: query-side emitted {} spans, expected 1",
query_spans.len()
);
let exec = &exec_spans[0];
let query = &query_spans[0];
assert_eq!(exec.name, query.name, "{case}: span name differs");
assert_eq!(exec.span_kind, query.span_kind, "{case}: span kind differs");
for key in &[
"db.system.name",
"db.operation.name",
"db.collection.name",
"db.query.text",
"db.query.summary",
"db.namespace",
"db.stored_procedure.name",
"server.address",
"server.port",
"db.response.affected_rows",
"db.response.returned_rows",
] {
assert_eq!(
attr(exec, key),
attr(query, key),
"{case}: attribute `{key}` differs across executor-side vs query-side",
);
}
}
#[macro_export]
macro_rules! test_execute_creates_span_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
$crate::fresh_table!(
&pool,
"exec_pool_test",
&format!("id {}", $dialect.id_pk_column)
);
tel.reset();
(&pool)
.execute("INSERT INTO exec_pool_test (id) VALUES (1)")
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert!($crate::common::attr(&spans[0], "db.response.returned_rows").is_none());
assert!($crate::common::attr(&spans[0], "db.response.affected_rows").is_some());
pool.with_annotations($crate::common::test_annotations())
.execute("INSERT INTO exec_pool_test (id) VALUES (2)")
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
pool.with_operation("SELECT", "users")
.execute("INSERT INTO exec_pool_test (id) VALUES (3)")
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_execute_creates_span_via_connection {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
$crate::fresh_table!(
&pool,
"exec_conn_test",
&format!("id {}", $dialect.id_pk_column)
);
tel.reset();
let mut conn = pool.acquire().await.unwrap();
(&mut conn)
.execute("INSERT INTO exec_conn_test (id) VALUES (1)")
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert!($crate::common::attr(&spans[0], "db.response.returned_rows").is_none());
assert!($crate::common::attr(&spans[0], "db.response.affected_rows").is_some());
conn.with_annotations($crate::common::test_annotations())
.execute("INSERT INTO exec_conn_test (id) VALUES (2)")
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
conn.with_operation("SELECT", "users")
.execute("INSERT INTO exec_conn_test (id) VALUES (3)")
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_execute_creates_span_via_transaction {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
$crate::fresh_table!(
&pool,
"exec_tx_test",
&format!("id {}", $dialect.id_pk_column)
);
tel.reset();
let mut tx = pool.begin().await.unwrap();
(&mut tx)
.execute("INSERT INTO exec_tx_test (id) VALUES (1)")
.await
.unwrap();
tx.with_annotations($crate::common::test_annotations())
.execute("INSERT INTO exec_tx_test (id) VALUES (2)")
.await
.unwrap();
tx.with_operation("SELECT", "users")
.execute("INSERT INTO exec_tx_test (id) VALUES (3)")
.await
.unwrap();
tx.commit().await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 3);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert!($crate::common::attr(&spans[0], "db.response.returned_rows").is_none());
assert!($crate::common::attr(&spans[0], "db.response.affected_rows").is_some());
$crate::common::assert_annotated_span(&spans[1], &$dialect);
$crate::common::assert_annotated_span(&spans[2], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_execute_records_error {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let result = (&pool).execute("INVALID SQL GIBBERISH").await;
assert!(result.is_err());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
$crate::common::assert_error_span(&spans[0]);
assert!($crate::common::attr(&spans[0], "db.response.returned_rows").is_none());
let result = pool
.with_annotations($crate::common::test_annotations())
.execute("INVALID SQL GIBBERISH")
.await;
assert!(result.is_err());
let last = tel.spans().last().unwrap().clone();
$crate::common::assert_annotated_span(&last, &$dialect);
$crate::common::assert_error_span(&last);
let result = pool
.with_operation("SELECT", "users")
.execute("INVALID SQL GIBBERISH")
.await;
assert!(result.is_err());
let last = tel.spans().last().unwrap().clone();
$crate::common::assert_annotated_span(&last, &$dialect);
$crate::common::assert_error_span(&last);
$crate::common::assert_annotated_error_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_execute_many_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use futures::StreamExt as _;
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut stream = (&pool).execute_many("SELECT 1; SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(0))
);
let mut stream = pool
.with_annotations($crate::common::test_annotations())
.execute_many("SELECT 1; SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
let mut stream = pool
.with_operation("SELECT", "users")
.execute_many("SELECT 1; SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_execute_many_via_connection {
($pool_factory:expr, $dialect:expr) => {{
use futures::StreamExt as _;
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut conn = pool.acquire().await.unwrap();
let mut stream = (&mut conn).execute_many("SELECT 1; SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(0))
);
let mut stream = conn
.with_annotations($crate::common::test_annotations())
.execute_many("SELECT 1; SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
let mut stream = conn
.with_operation("SELECT", "users")
.execute_many("SELECT 1; SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_execute_many_via_transaction {
($pool_factory:expr, $dialect:expr) => {{
use futures::StreamExt as _;
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut tx = pool.begin().await.unwrap();
let mut stream = (&mut tx).execute_many("SELECT 1; SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
let mut stream = tx
.with_annotations($crate::common::test_annotations())
.execute_many("SELECT 1; SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
let mut stream = tx
.with_operation("SELECT", "users")
.execute_many("SELECT 1; SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
tx.commit().await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 3);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(0))
);
$crate::common::assert_annotated_span(&spans[1], &$dialect);
$crate::common::assert_annotated_span(&spans[2], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_execute_many_records_error {
($pool_factory:expr, $dialect:expr) => {{
use futures::StreamExt as _;
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut stream = (&pool).execute_many("INVALID SQL GIBBERISH");
let result = stream.next().await;
assert!(result.is_some_and(|r| r.is_err()));
drop(stream);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
$crate::common::assert_error_span(&spans[0]);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(0))
);
let mut stream = pool
.with_annotations($crate::common::test_annotations())
.execute_many("INVALID SQL GIBBERISH");
let result = stream.next().await;
assert!(result.is_some_and(|r| r.is_err()));
drop(stream);
let last = tel.spans().last().unwrap().clone();
$crate::common::assert_annotated_span(&last, &$dialect);
$crate::common::assert_error_span(&last);
let mut stream = pool
.with_operation("SELECT", "users")
.execute_many("INVALID SQL GIBBERISH");
let result = stream.next().await;
assert!(result.is_some_and(|r| r.is_err()));
drop(stream);
let last = tel.spans().last().unwrap().clone();
$crate::common::assert_annotated_span(&last, &$dialect);
$crate::common::assert_error_span(&last);
$crate::common::assert_annotated_error_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_fetch_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use futures::StreamExt as _;
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut stream = (&pool).fetch("SELECT 1 UNION ALL SELECT 2");
let mut count = 0u64;
while stream.next().await.is_some() {
count += 1;
}
assert_eq!(count, 2);
drop(stream);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(2))
);
let mut stream = pool
.with_annotations($crate::common::test_annotations())
.fetch("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
let mut stream = pool
.with_operation("SELECT", "users")
.fetch("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_fetch_via_connection {
($pool_factory:expr, $dialect:expr) => {{
use futures::StreamExt as _;
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut conn = pool.acquire().await.unwrap();
let mut stream = (&mut conn).fetch("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(2))
);
let mut stream = conn
.with_annotations($crate::common::test_annotations())
.fetch("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
let mut stream = conn
.with_operation("SELECT", "users")
.fetch("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_fetch_via_transaction {
($pool_factory:expr, $dialect:expr) => {{
use futures::StreamExt as _;
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut tx = pool.begin().await.unwrap();
let mut stream = (&mut tx).fetch("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
let mut stream = tx
.with_annotations($crate::common::test_annotations())
.fetch("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
let mut stream = tx
.with_operation("SELECT", "users")
.fetch("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
tx.commit().await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 3);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(2))
);
$crate::common::assert_annotated_span(&spans[1], &$dialect);
$crate::common::assert_annotated_span(&spans[2], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_fetch_stream_dropped_early_still_records_span {
($pool_factory:expr, $dialect:expr) => {{
use futures::StreamExt as _;
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
{
let mut stream = (&pool).fetch("SELECT 1 UNION ALL SELECT 2");
let _ = stream.next().await;
}
let spans = tel.spans();
assert_eq!(
spans.len(),
1,
"span should be recorded even when stream is dropped early"
);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(1))
);
$crate::common::assert_metric_for_system(&tel, $dialect.system);
}};
}
#[macro_export]
macro_rules! test_fetch_stream_records_error {
($pool_factory:expr, $dialect:expr) => {{
use futures::StreamExt as _;
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut stream = (&pool).fetch("INVALID SQL");
let result = stream.next().await;
assert!(result.is_some_and(|r| r.is_err()));
drop(stream);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
$crate::common::assert_error_span(&spans[0]);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(0))
);
let mut stream = pool
.with_annotations($crate::common::test_annotations())
.fetch("INVALID SQL");
let result = stream.next().await;
assert!(result.is_some_and(|r| r.is_err()));
drop(stream);
let last = tel.spans().last().unwrap().clone();
$crate::common::assert_annotated_span(&last, &$dialect);
$crate::common::assert_error_span(&last);
let mut stream = pool.with_operation("SELECT", "users").fetch("INVALID SQL");
let result = stream.next().await;
assert!(result.is_some_and(|r| r.is_err()));
drop(stream);
let last = tel.spans().last().unwrap().clone();
$crate::common::assert_annotated_span(&last, &$dialect);
$crate::common::assert_error_span(&last);
$crate::common::assert_annotated_error_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_fetch_many_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use futures::StreamExt as _;
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut stream = (&pool).fetch_many("SELECT 1 UNION ALL SELECT 2");
let mut rows = 0u64;
let mut results = 0u64;
while let Some(item) = stream.next().await {
match item.unwrap() {
sqlx::Either::Left(_) => results += 1,
sqlx::Either::Right(_) => rows += 1,
}
}
drop(stream);
assert_eq!(rows, 2);
assert!(results >= 1, "should have at least one QueryResult");
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(2))
);
let mut stream = pool
.with_annotations($crate::common::test_annotations())
.fetch_many("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
let mut stream = pool
.with_operation("SELECT", "users")
.fetch_many("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_fetch_many_via_connection {
($pool_factory:expr, $dialect:expr) => {{
use futures::StreamExt as _;
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut conn = pool.acquire().await.unwrap();
let mut stream = (&mut conn).fetch_many("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(2))
);
let mut stream = conn
.with_annotations($crate::common::test_annotations())
.fetch_many("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
let mut stream = conn
.with_operation("SELECT", "users")
.fetch_many("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_fetch_many_via_transaction {
($pool_factory:expr, $dialect:expr) => {{
use futures::StreamExt as _;
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut tx = pool.begin().await.unwrap();
let mut stream = (&mut tx).fetch_many("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
let mut stream = tx
.with_annotations($crate::common::test_annotations())
.fetch_many("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
let mut stream = tx
.with_operation("SELECT", "users")
.fetch_many("SELECT 1 UNION ALL SELECT 2");
while stream.next().await.is_some() {}
drop(stream);
tx.commit().await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 3);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(2))
);
$crate::common::assert_annotated_span(&spans[1], &$dialect);
$crate::common::assert_annotated_span(&spans[2], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_fetch_many_dropped_early_still_records_span {
($pool_factory:expr, $dialect:expr) => {{
use futures::StreamExt as _;
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
{
let mut stream = (&pool).fetch_many("SELECT 1 UNION ALL SELECT 2");
let _ = stream.next().await;
}
let spans = tel.spans();
assert_eq!(
spans.len(),
1,
"span should be recorded even when stream is dropped early"
);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(1))
);
$crate::common::assert_metric_for_system(&tel, $dialect.system);
}};
}
#[macro_export]
macro_rules! test_fetch_many_records_error {
($pool_factory:expr, $dialect:expr) => {{
use futures::StreamExt as _;
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut stream = (&pool).fetch_many("INVALID SQL GIBBERISH");
let result = stream.next().await;
assert!(result.is_some_and(|r| r.is_err()));
drop(stream);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
$crate::common::assert_error_span(&spans[0]);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(0))
);
let mut stream = pool
.with_annotations($crate::common::test_annotations())
.fetch_many("INVALID SQL GIBBERISH");
let result = stream.next().await;
assert!(result.is_some_and(|r| r.is_err()));
drop(stream);
let last = tel.spans().last().unwrap().clone();
$crate::common::assert_annotated_span(&last, &$dialect);
$crate::common::assert_error_span(&last);
let mut stream = pool
.with_operation("SELECT", "users")
.fetch_many("INVALID SQL GIBBERISH");
let result = stream.next().await;
assert!(result.is_some_and(|r| r.is_err()));
drop(stream);
let last = tel.spans().last().unwrap().clone();
$crate::common::assert_annotated_span(&last, &$dialect);
$crate::common::assert_error_span(&last);
$crate::common::assert_annotated_error_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_fetch_all_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let rows = (&pool)
.fetch_all("SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3")
.await
.unwrap();
assert_eq!(rows.len(), 3);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(3))
);
pool.with_annotations($crate::common::test_annotations())
.fetch_all("SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3")
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
pool.with_operation("SELECT", "users")
.fetch_all("SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3")
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_fetch_all_via_connection {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut conn = pool.acquire().await.unwrap();
let rows = (&mut conn)
.fetch_all("SELECT 1 UNION ALL SELECT 2")
.await
.unwrap();
assert_eq!(rows.len(), 2);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(2))
);
conn.with_annotations($crate::common::test_annotations())
.fetch_all("SELECT 1 UNION ALL SELECT 2")
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
conn.with_operation("SELECT", "users")
.fetch_all("SELECT 1 UNION ALL SELECT 2")
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_fetch_all_via_transaction {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut tx = pool.begin().await.unwrap();
let rows = (&mut tx)
.fetch_all("SELECT 1 UNION ALL SELECT 2")
.await
.unwrap();
assert_eq!(rows.len(), 2);
tx.with_annotations($crate::common::test_annotations())
.fetch_all("SELECT 1 UNION ALL SELECT 2")
.await
.unwrap();
tx.with_operation("SELECT", "users")
.fetch_all("SELECT 1 UNION ALL SELECT 2")
.await
.unwrap();
tx.commit().await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 3);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(2))
);
$crate::common::assert_annotated_span(&spans[1], &$dialect);
$crate::common::assert_annotated_span(&spans[2], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_fetch_all_records_error {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let result = (&pool).fetch_all("INVALID SQL GIBBERISH").await;
assert!(result.is_err());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
$crate::common::assert_error_span(&spans[0]);
assert!($crate::common::attr(&spans[0], "db.response.returned_rows").is_none());
let result = pool
.with_annotations($crate::common::test_annotations())
.fetch_all("INVALID SQL GIBBERISH")
.await;
assert!(result.is_err());
let last = tel.spans().last().unwrap().clone();
$crate::common::assert_annotated_span(&last, &$dialect);
$crate::common::assert_error_span(&last);
let result = pool
.with_operation("SELECT", "users")
.fetch_all("INVALID SQL GIBBERISH")
.await;
assert!(result.is_err());
let last = tel.spans().last().unwrap().clone();
$crate::common::assert_annotated_span(&last, &$dialect);
$crate::common::assert_error_span(&last);
$crate::common::assert_annotated_error_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_fetch_one_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let _row = (&pool).fetch_one("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(1))
);
pool.with_annotations($crate::common::test_annotations())
.fetch_one("SELECT 1")
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
pool.with_operation("SELECT", "users")
.fetch_one("SELECT 1")
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_fetch_one_via_connection {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut conn = pool.acquire().await.unwrap();
let _row = (&mut conn).fetch_one("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(1))
);
conn.with_annotations($crate::common::test_annotations())
.fetch_one("SELECT 1")
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
conn.with_operation("SELECT", "users")
.fetch_one("SELECT 1")
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_fetch_one_via_transaction {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut tx = pool.begin().await.unwrap();
let _row = (&mut tx).fetch_one("SELECT 1").await.unwrap();
tx.with_annotations($crate::common::test_annotations())
.fetch_one("SELECT 1")
.await
.unwrap();
tx.with_operation("SELECT", "users")
.fetch_one("SELECT 1")
.await
.unwrap();
tx.commit().await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 3);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(1))
);
$crate::common::assert_annotated_span(&spans[1], &$dialect);
$crate::common::assert_annotated_span(&spans[2], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_fetch_one_records_error {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let result = (&pool).fetch_one("INVALID SQL GIBBERISH").await;
assert!(result.is_err());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
$crate::common::assert_error_span(&spans[0]);
assert!($crate::common::attr(&spans[0], "db.response.returned_rows").is_none());
let result = pool
.with_annotations($crate::common::test_annotations())
.fetch_one("INVALID SQL GIBBERISH")
.await;
assert!(result.is_err());
let last = tel.spans().last().unwrap().clone();
$crate::common::assert_annotated_span(&last, &$dialect);
$crate::common::assert_error_span(&last);
let result = pool
.with_operation("SELECT", "users")
.fetch_one("INVALID SQL GIBBERISH")
.await;
assert!(result.is_err());
let last = tel.spans().last().unwrap().clone();
$crate::common::assert_annotated_span(&last, &$dialect);
$crate::common::assert_error_span(&last);
$crate::common::assert_annotated_error_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_fetch_optional_records_one_row {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let result = (&pool).fetch_optional("SELECT 1").await.unwrap();
assert!(result.is_some());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(1))
);
pool.with_annotations($crate::common::test_annotations())
.fetch_optional("SELECT 1")
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
pool.with_operation("SELECT", "users")
.fetch_optional("SELECT 1")
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_fetch_optional_via_connection {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut conn = pool.acquire().await.unwrap();
let result = (&mut conn).fetch_optional("SELECT 1").await.unwrap();
assert!(result.is_some());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(1))
);
conn.with_annotations($crate::common::test_annotations())
.fetch_optional("SELECT 1")
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
conn.with_operation("SELECT", "users")
.fetch_optional("SELECT 1")
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_fetch_optional_via_transaction {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut tx = pool.begin().await.unwrap();
let result = (&mut tx).fetch_optional("SELECT 1").await.unwrap();
assert!(result.is_some());
tx.with_annotations($crate::common::test_annotations())
.fetch_optional("SELECT 1")
.await
.unwrap();
tx.with_operation("SELECT", "users")
.fetch_optional("SELECT 1")
.await
.unwrap();
tx.commit().await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 3);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(1))
);
$crate::common::assert_annotated_span(&spans[1], &$dialect);
$crate::common::assert_annotated_span(&spans[2], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_fetch_optional_records_error {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let result = (&pool).fetch_optional("INVALID SQL GIBBERISH").await;
assert!(result.is_err());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
$crate::common::assert_error_span(&spans[0]);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
None
);
let result = pool
.with_annotations($crate::common::test_annotations())
.fetch_optional("INVALID SQL GIBBERISH")
.await;
assert!(result.is_err());
let last = tel.spans().last().unwrap().clone();
$crate::common::assert_annotated_span(&last, &$dialect);
$crate::common::assert_error_span(&last);
let result = pool
.with_operation("SELECT", "users")
.fetch_optional("INVALID SQL GIBBERISH")
.await;
assert!(result.is_err());
let last = tel.spans().last().unwrap().clone();
$crate::common::assert_annotated_span(&last, &$dialect);
$crate::common::assert_error_span(&last);
$crate::common::assert_annotated_error_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_prepare_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let _stmt = (&pool).prepare("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert!($crate::common::attr(&spans[0], "db.response.returned_rows").is_none());
pool.with_annotations($crate::common::test_annotations())
.prepare("SELECT 1")
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
pool.with_operation("SELECT", "users")
.prepare("SELECT 1")
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_prepare_via_connection {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut conn = pool.acquire().await.unwrap();
let _stmt = (&mut conn).prepare("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert!($crate::common::attr(&spans[0], "db.response.returned_rows").is_none());
conn.with_annotations($crate::common::test_annotations())
.prepare("SELECT 1")
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
conn.with_operation("SELECT", "users")
.prepare("SELECT 1")
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_prepare_via_transaction {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut tx = pool.begin().await.unwrap();
let _stmt = (&mut tx).prepare("SELECT 1").await.unwrap();
tx.with_annotations($crate::common::test_annotations())
.prepare("SELECT 1")
.await
.unwrap();
tx.with_operation("SELECT", "users")
.prepare("SELECT 1")
.await
.unwrap();
tx.commit().await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 3);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert!($crate::common::attr(&spans[0], "db.response.returned_rows").is_none());
$crate::common::assert_annotated_span(&spans[1], &$dialect);
$crate::common::assert_annotated_span(&spans[2], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_prepare_records_error {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut conn = pool.acquire().await.unwrap();
let result = (&mut conn).prepare("INVALID SQL GIBBERISH").await;
assert!(result.is_err());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
$crate::common::assert_error_span(&spans[0]);
assert!($crate::common::attr(&spans[0], "db.response.returned_rows").is_none());
let result = conn
.with_annotations($crate::common::test_annotations())
.prepare("INVALID SQL GIBBERISH")
.await;
assert!(result.is_err());
let last = tel.spans().last().unwrap().clone();
$crate::common::assert_annotated_span(&last, &$dialect);
$crate::common::assert_error_span(&last);
let result = conn
.with_operation("SELECT", "users")
.prepare("INVALID SQL GIBBERISH")
.await;
assert!(result.is_err());
let last = tel.spans().last().unwrap().clone();
$crate::common::assert_annotated_span(&last, &$dialect);
$crate::common::assert_error_span(&last);
$crate::common::assert_annotated_error_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_prepare_with_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let _stmt = (&pool)
.prepare_with($dialect.prepare_with_select_sql, &[])
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert!($crate::common::attr(&spans[0], "db.response.returned_rows").is_none());
pool.with_annotations($crate::common::test_annotations())
.prepare_with($dialect.prepare_with_select_sql, &[])
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
pool.with_operation("SELECT", "users")
.prepare_with($dialect.prepare_with_select_sql, &[])
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_prepare_with_via_connection {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut conn = pool.acquire().await.unwrap();
let _stmt = (&mut conn)
.prepare_with($dialect.prepare_with_select_sql, &[])
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert!($crate::common::attr(&spans[0], "db.response.returned_rows").is_none());
conn.with_annotations($crate::common::test_annotations())
.prepare_with($dialect.prepare_with_select_sql, &[])
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
conn.with_operation("SELECT", "users")
.prepare_with($dialect.prepare_with_select_sql, &[])
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_prepare_with_via_transaction {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut tx = pool.begin().await.unwrap();
let _stmt = (&mut tx)
.prepare_with($dialect.prepare_with_select_sql, &[])
.await
.unwrap();
tx.with_annotations($crate::common::test_annotations())
.prepare_with($dialect.prepare_with_select_sql, &[])
.await
.unwrap();
tx.with_operation("SELECT", "users")
.prepare_with($dialect.prepare_with_select_sql, &[])
.await
.unwrap();
tx.commit().await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 3);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert!($crate::common::attr(&spans[0], "db.response.returned_rows").is_none());
$crate::common::assert_annotated_span(&spans[1], &$dialect);
$crate::common::assert_annotated_span(&spans[2], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_prepare_with_records_error {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut conn = pool.acquire().await.unwrap();
let result = (&mut conn).prepare_with("INVALID SQL GIBBERISH", &[]).await;
assert!(result.is_err());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
$crate::common::assert_error_span(&spans[0]);
assert!($crate::common::attr(&spans[0], "db.response.returned_rows").is_none());
let result = conn
.with_annotations($crate::common::test_annotations())
.prepare_with("INVALID SQL GIBBERISH", &[])
.await;
assert!(result.is_err());
let last = tel.spans().last().unwrap().clone();
$crate::common::assert_annotated_span(&last, &$dialect);
$crate::common::assert_error_span(&last);
let result = conn
.with_operation("SELECT", "users")
.prepare_with("INVALID SQL GIBBERISH", &[])
.await;
assert!(result.is_err());
let last = tel.spans().last().unwrap().clone();
$crate::common::assert_annotated_span(&last, &$dialect);
$crate::common::assert_error_span(&last);
$crate::common::assert_annotated_error_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_describe_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let _desc = (&pool).describe("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert!($crate::common::attr(&spans[0], "db.response.returned_rows").is_none());
pool.with_annotations($crate::common::test_annotations())
.describe("SELECT 1")
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
pool.with_operation("SELECT", "users")
.describe("SELECT 1")
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_describe_via_connection {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut conn = pool.acquire().await.unwrap();
let _desc = (&mut conn).describe("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert!($crate::common::attr(&spans[0], "db.response.returned_rows").is_none());
conn.with_annotations($crate::common::test_annotations())
.describe("SELECT 1")
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
conn.with_operation("SELECT", "users")
.describe("SELECT 1")
.await
.unwrap();
$crate::common::assert_annotated_span(tel.spans().last().unwrap(), &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_describe_via_transaction {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut tx = pool.begin().await.unwrap();
let _desc = (&mut tx).describe("SELECT 1").await.unwrap();
tx.with_annotations($crate::common::test_annotations())
.describe("SELECT 1")
.await
.unwrap();
tx.with_operation("SELECT", "users")
.describe("SELECT 1")
.await
.unwrap();
tx.commit().await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 3);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert!($crate::common::attr(&spans[0], "db.response.returned_rows").is_none());
$crate::common::assert_annotated_span(&spans[1], &$dialect);
$crate::common::assert_annotated_span(&spans[2], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_describe_records_error {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut conn = pool.acquire().await.unwrap();
let result = (&mut conn).describe("INVALID SQL GIBBERISH").await;
assert!(result.is_err());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
$crate::common::assert_error_span(&spans[0]);
assert!($crate::common::attr(&spans[0], "db.response.returned_rows").is_none());
let result = conn
.with_annotations($crate::common::test_annotations())
.describe("INVALID SQL GIBBERISH")
.await;
assert!(result.is_err());
let last = tel.spans().last().unwrap().clone();
$crate::common::assert_annotated_span(&last, &$dialect);
$crate::common::assert_error_span(&last);
let result = conn
.with_operation("SELECT", "users")
.describe("INVALID SQL GIBBERISH")
.await;
assert!(result.is_err());
let last = tel.spans().last().unwrap().clone();
$crate::common::assert_annotated_span(&last, &$dialect);
$crate::common::assert_error_span(&last);
$crate::common::assert_annotated_error_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_operation_duration_metric_carries_full_annotations {
($pool_factory:expr, $dialect:expr) => {{
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
pool.with_annotations(
sqlx_otel::QueryAnnotations::new()
.operation("SELECT")
.collection("users")
.query_summary("users by id")
.stored_procedure("sp_get_users"),
)
.fetch_one("SELECT 1")
.await
.unwrap();
$crate::common::assert_metric_data_point(
&tel,
&[
("db.system.name", $dialect.system),
("db.operation.name", "SELECT"),
("db.collection.name", "users"),
("db.query.summary", "users by id"),
("db.stored_procedure.name", "sp_get_users"),
],
);
}};
}
#[macro_export]
macro_rules! test_operation_duration_metric_carries_sqlstate {
($pool_factory:expr, $expected_code:expr) => {{
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let result = pool
.with_operation("SELECT", "nonexistent_table_xyz")
.fetch_one("SELECT * FROM nonexistent_table_xyz")
.await;
assert!(result.is_err(), "expected fetch_one to fail");
let resource_metrics = tel.metrics();
let dp = $crate::common::find_duration_data_point(&resource_metrics)
.expect("db.client.operation.duration data point missing");
assert_eq!(
$crate::common::metric_attr(&dp, "db.response.status_code"),
Some(opentelemetry::Value::String($expected_code.into())),
"metric must carry db.response.status_code for sqlx::Error::Database",
);
}};
}
#[macro_export]
macro_rules! test_annotation_all_four_fields {
($pool_factory:expr, $dialect:expr) => {{
let _ = $dialect; let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
pool.with_annotations(
sqlx_otel::QueryAnnotations::new()
.operation("SELECT")
.collection("users")
.query_summary("users by id")
.stored_procedure("sp_get_users"),
)
.fetch_all("SELECT 1")
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(spans[0].name, "users by id");
assert_eq!(
$crate::common::attr(&spans[0], "db.operation.name"),
Some(opentelemetry::Value::String("SELECT".into())),
);
assert_eq!(
$crate::common::attr(&spans[0], "db.collection.name"),
Some(opentelemetry::Value::String("users".into())),
);
assert_eq!(
$crate::common::attr(&spans[0], "db.query.summary"),
Some(opentelemetry::Value::String("users by id".into())),
);
assert_eq!(
$crate::common::attr(&spans[0], "db.stored_procedure.name"),
Some(opentelemetry::Value::String("sp_get_users".into())),
);
}};
}
#[macro_export]
macro_rules! test_query_summary_drives_span_name {
($pool_factory:expr, $dialect:expr) => {{
let _ = $dialect;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
pool.with_annotations(
sqlx_otel::QueryAnnotations::new()
.operation("SELECT")
.collection("users")
.query_summary("users by tenant"),
)
.fetch_all("SELECT 1")
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(spans[0].name, "users by tenant");
assert_eq!(
$crate::common::attr(&spans[0], "db.query.summary"),
Some(opentelemetry::Value::String("users by tenant".into())),
);
assert_eq!(
$crate::common::attr(&spans[0], "db.operation.name"),
Some(opentelemetry::Value::String("SELECT".into())),
);
assert_eq!(
$crate::common::attr(&spans[0], "db.collection.name"),
Some(opentelemetry::Value::String("users".into())),
);
}};
}
#[macro_export]
macro_rules! test_query_execute_many_with_annotations_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use futures::StreamExt as _;
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
#[allow(deprecated)]
let mut stream = sqlx::query("SELECT 1; SELECT 2")
.with_annotations($crate::common::test_annotations())
.execute_many(&pool)
.await;
while stream.next().await.is_some() {}
drop(stream);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_fetch_with_annotations_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use futures::StreamExt as _;
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut stream = sqlx::query("SELECT 1 UNION ALL SELECT 2")
.with_annotations($crate::common::test_annotations())
.fetch(&pool);
while stream.next().await.is_some() {}
drop(stream);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(2))
);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_fetch_many_with_annotations_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use futures::StreamExt as _;
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
#[allow(deprecated)]
let mut stream = sqlx::query("SELECT 1 UNION ALL SELECT 2")
.with_annotations($crate::common::test_annotations())
.fetch_many(&pool);
while stream.next().await.is_some() {}
drop(stream);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_fetch_all_with_annotations_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let rows = sqlx::query("SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3")
.with_annotations($crate::common::test_annotations())
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(rows.len(), 3);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(3))
);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_fetch_one_with_annotations_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let _row = sqlx::query("SELECT 1")
.with_annotations($crate::common::test_annotations())
.fetch_one(&pool)
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(1))
);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_execute_with_annotations_records_error {
($pool_factory:expr, $dialect:expr) => {{
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let result = sqlx::query("INVALID SQL GIBBERISH")
.with_annotations($crate::common::test_annotations())
.execute(&pool)
.await;
assert!(result.is_err());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_error_span(&spans[0]);
$crate::common::assert_annotated_error_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_as_fetch_with_annotations_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use futures::StreamExt as _;
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut stream = sqlx::query_as::<_, (i32,)>("SELECT 1 UNION ALL SELECT 2")
.with_annotations($crate::common::test_annotations())
.fetch(&pool);
while stream.next().await.is_some() {}
drop(stream);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_as_fetch_many_with_annotations_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use futures::StreamExt as _;
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
#[allow(deprecated)]
let mut stream = sqlx::query_as::<_, (i32,)>("SELECT 1 UNION ALL SELECT 2")
.with_annotations($crate::common::test_annotations())
.fetch_many(&pool);
while stream.next().await.is_some() {}
drop(stream);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_as_fetch_all_with_annotations_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let rows: Vec<(i32,)> = sqlx::query_as("SELECT 1 UNION ALL SELECT 2")
.with_annotations($crate::common::test_annotations())
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(rows.len(), 2);
let pool_clone = pool.clone();
let rows: Vec<(i32,)> = tokio::spawn(async move {
sqlx::query_as("SELECT 1 UNION ALL SELECT 2")
.with_annotations($crate::common::test_annotations())
.fetch_all(&pool_clone)
.await
.unwrap()
})
.await
.unwrap();
assert_eq!(rows.len(), 2);
let spans = tel.spans();
assert_eq!(spans.len(), 2);
for span in &spans {
$crate::common::assert_annotated_span(span, &$dialect);
}
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_as_fetch_one_with_annotations_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let row: (i32,) = sqlx::query_as("SELECT 7")
.with_annotations($crate::common::test_annotations())
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0, 7);
let pool_clone = pool.clone();
let row: (i32,) = tokio::spawn(async move {
sqlx::query_as("SELECT 7")
.with_annotations($crate::common::test_annotations())
.fetch_one(&pool_clone)
.await
.unwrap()
})
.await
.unwrap();
assert_eq!(row.0, 7);
let spans = tel.spans();
assert_eq!(spans.len(), 2);
for span in &spans {
$crate::common::assert_annotated_span(span, &$dialect);
}
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_as_fetch_optional_with_annotations_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let row: Option<(i32,)> = sqlx::query_as("SELECT 1 WHERE 1 = 0")
.with_annotations($crate::common::test_annotations())
.fetch_optional(&pool)
.await
.unwrap();
assert!(row.is_none());
let pool_clone = pool.clone();
let row: Option<(i32,)> = tokio::spawn(async move {
sqlx::query_as("SELECT 1 WHERE 1 = 0")
.with_annotations($crate::common::test_annotations())
.fetch_optional(&pool_clone)
.await
.unwrap()
})
.await
.unwrap();
assert!(row.is_none());
let spans = tel.spans();
assert_eq!(spans.len(), 2);
for span in &spans {
$crate::common::assert_annotated_span(span, &$dialect);
}
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_as_fetch_one_with_annotations_records_error {
($pool_factory:expr, $dialect:expr) => {{
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let result: Result<(i32,), _> = sqlx::query_as("INVALID SQL")
.with_annotations($crate::common::test_annotations())
.fetch_one(&pool)
.await;
assert!(result.is_err());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_error_span(&spans[0]);
$crate::common::assert_annotated_error_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_scalar_fetch_with_annotations_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use futures::StreamExt as _;
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut stream = sqlx::query_scalar::<_, i32>("SELECT 1 UNION ALL SELECT 2")
.with_annotations($crate::common::test_annotations())
.fetch(&pool);
while stream.next().await.is_some() {}
drop(stream);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_scalar_fetch_many_with_annotations_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use futures::StreamExt as _;
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
#[allow(deprecated)]
let mut stream = sqlx::query_scalar::<_, i32>("SELECT 1 UNION ALL SELECT 2")
.with_annotations($crate::common::test_annotations())
.fetch_many(&pool);
while stream.next().await.is_some() {}
drop(stream);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_scalar_fetch_all_with_annotations_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let rows: Vec<i32> = sqlx::query_scalar("SELECT 1 UNION ALL SELECT 2")
.with_annotations($crate::common::test_annotations())
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(rows, vec![1, 2]);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_scalar_fetch_one_with_annotations_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let value: i32 = sqlx::query_scalar("SELECT 42")
.with_annotations($crate::common::test_annotations())
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(value, 42);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_scalar_fetch_optional_with_annotations_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let value: Option<i32> = sqlx::query_scalar("SELECT 1 WHERE 1 = 0")
.with_annotations($crate::common::test_annotations())
.fetch_optional(&pool)
.await
.unwrap();
assert!(value.is_none());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_map_position_1_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Row as _;
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let value: i32 = sqlx::query("SELECT 7")
.with_annotations($crate::common::test_annotations())
.map(|row: Row| row.get::<i32, _>(0))
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(value, 7);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_map_position_2_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Row as _;
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let value: i32 = sqlx::query("SELECT 11")
.with_annotations($crate::common::test_annotations())
.map(|row: Row| row.get::<i32, _>(0))
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(value, 11);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_map_position_3_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Row as _;
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let value: i32 = sqlx::query("SELECT 13")
.map(|row: Row| row.get::<i32, _>(0))
.with_annotations($crate::common::test_annotations())
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(value, 13);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_try_map_position_3_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Row as _;
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let value: i32 = sqlx::query("SELECT 17")
.try_map(|row: Row| Ok(row.get::<i32, _>(0)))
.with_annotations($crate::common::test_annotations())
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(value, 17);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_map_fetch_with_annotations_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use futures::StreamExt as _;
use sqlx::Row as _;
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut stream = sqlx::query("SELECT 1 UNION ALL SELECT 2")
.map(|row: Row| row.get::<i32, _>(0))
.with_annotations($crate::common::test_annotations())
.fetch(&pool);
while stream.next().await.is_some() {}
drop(stream);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(2))
);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_map_fetch_many_with_annotations_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use futures::StreamExt as _;
use sqlx::Row as _;
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
#[allow(deprecated)]
let mut stream = sqlx::query("SELECT 1 UNION ALL SELECT 2")
.map(|row: Row| row.get::<i32, _>(0))
.with_annotations($crate::common::test_annotations())
.fetch_many(&pool);
while stream.next().await.is_some() {}
drop(stream);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_map_fetch_all_with_annotations_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Row as _;
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let rows: Vec<i32> = sqlx::query("SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3")
.map(|row: Row| row.get::<i32, _>(0))
.with_annotations($crate::common::test_annotations())
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(rows, vec![1, 2, 3]);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_map_fetch_one_with_annotations_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Row as _;
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let value: i32 = sqlx::query("SELECT 19")
.map(|row: Row| row.get::<i32, _>(0))
.with_annotations($crate::common::test_annotations())
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(value, 19);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_map_fetch_optional_with_annotations_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Row as _;
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let value: Option<i32> = sqlx::query("SELECT 1 WHERE 1 = 0")
.map(|row: Row| row.get::<i32, _>(0))
.with_annotations($crate::common::test_annotations())
.fetch_optional(&pool)
.await
.unwrap();
assert!(value.is_none());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_map_compose_after_annotations_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Row as _;
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let value: i32 = sqlx::query("SELECT 5")
.map(|row: Row| row.get::<i32, _>(0))
.with_annotations($crate::common::test_annotations())
.map(|n| n * 2)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(value, 10);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_map_try_map_compose_after_annotations_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Row as _;
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let value: i32 = sqlx::query("SELECT 6")
.map(|row: Row| row.get::<i32, _>(0))
.with_annotations($crate::common::test_annotations())
.try_map(|n: i32| Ok::<_, sqlx::Error>(n + 100))
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(value, 106);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_map_with_annotations_via_connection {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Row as _;
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut conn = pool.acquire().await.unwrap();
let value: i32 = sqlx::query("SELECT 23")
.map(|row: Row| row.get::<i32, _>(0))
.with_annotations($crate::common::test_annotations())
.fetch_one(&mut conn)
.await
.unwrap();
assert_eq!(value, 23);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_map_with_annotations_via_transaction {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Row as _;
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut tx = pool.begin().await.unwrap();
let value: i32 = sqlx::query("SELECT 29")
.map(|row: Row| row.get::<i32, _>(0))
.with_annotations($crate::common::test_annotations())
.fetch_one(&mut tx)
.await
.unwrap();
assert_eq!(value, 29);
tx.commit().await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_map_with_annotations_records_error {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Row as _;
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let result: Result<i32, _> = sqlx::query("INVALID SQL")
.map(|row: Row| row.get::<i32, _>(0))
.with_annotations($crate::common::test_annotations())
.fetch_one(&pool)
.await;
assert!(result.is_err());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_error_span(&spans[0]);
$crate::common::assert_annotated_error_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_try_map_with_annotations_propagates_mapper_error {
($pool_factory:expr, $dialect:expr) => {{
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let result: Result<i64, _> = sqlx::query("SELECT 1")
.try_map(|_row: Row| {
Err::<i64, _>(sqlx::Error::Decode(
"intentional decode failure".to_string().into(),
))
})
.with_annotations($crate::common::test_annotations())
.fetch_one(&pool)
.await;
assert!(result.is_err());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_builder_with_database_overrides_namespace {
($raw_pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let _ = $dialect;
let tel = $crate::common::TestTelemetry::install();
let raw = $raw_pool_factory;
let pool = sqlx_otel::PoolBuilder::from(raw)
.with_database("custom_db")
.build();
let _ = (&pool).fetch_optional("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
$crate::common::attr(&spans[0], "db.namespace"),
Some(opentelemetry::Value::String("custom_db".into()))
);
}};
}
#[macro_export]
macro_rules! test_builder_with_host_overrides_server_address {
($raw_pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let _ = $dialect;
let tel = $crate::common::TestTelemetry::install();
let raw = $raw_pool_factory;
let pool = sqlx_otel::PoolBuilder::from(raw)
.with_host("custom-host")
.build();
let _ = (&pool).fetch_optional("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
$crate::common::attr(&spans[0], "server.address"),
Some(opentelemetry::Value::String("custom-host".into()))
);
}};
}
#[macro_export]
macro_rules! test_builder_with_port_overrides_server_port {
($raw_pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let _ = $dialect;
let tel = $crate::common::TestTelemetry::install();
let raw = $raw_pool_factory;
let pool = sqlx_otel::PoolBuilder::from(raw).with_port(9999).build();
let _ = (&pool).fetch_optional("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
$crate::common::attr(&spans[0], "server.port"),
Some(opentelemetry::Value::I64(9999))
);
}};
}
#[macro_export]
macro_rules! test_builder_with_network_peer_address {
($raw_pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let _ = $dialect;
let tel = $crate::common::TestTelemetry::install();
let raw = $raw_pool_factory;
let pool = sqlx_otel::PoolBuilder::from(raw)
.with_network_peer_address("10.0.0.5")
.build();
let _ = (&pool).fetch_optional("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
$crate::common::attr(&spans[0], "network.peer.address"),
Some(opentelemetry::Value::String("10.0.0.5".into()))
);
}};
}
#[macro_export]
macro_rules! test_builder_with_pool_name_propagates_to_span_and_metric {
($raw_pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let _ = $dialect;
let tel = $crate::common::TestTelemetry::install();
let raw = $raw_pool_factory;
let pool = sqlx_otel::PoolBuilder::from(raw)
.with_pool_name("primary-rw")
.build();
let _ = (&pool).fetch_optional("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
$crate::common::attr(&spans[0], "db.client.connection.pool.name"),
Some(opentelemetry::Value::String("primary-rw".into())),
"span must carry db.client.connection.pool.name set via with_pool_name",
);
let resource_metrics = tel.metrics();
let dp = $crate::common::find_duration_data_point(&resource_metrics)
.expect("db.client.operation.duration data point missing");
assert_eq!(
$crate::common::metric_attr(&dp, "db.client.connection.pool.name"),
Some(opentelemetry::Value::String("primary-rw".into())),
"metric must carry db.client.connection.pool.name set via with_pool_name",
);
}};
}
#[macro_export]
macro_rules! test_builder_default_network_protocol_name {
($raw_pool_factory:expr, $expected:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let raw = $raw_pool_factory;
let pool = sqlx_otel::PoolBuilder::from(raw).build();
let _ = (&pool).fetch_optional("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
let actual_span = $crate::common::attr(&spans[0], "network.protocol.name");
let expected: Option<&str> = $expected;
assert_eq!(
actual_span,
expected.map(|s| opentelemetry::Value::String(s.to_owned().into())),
"span network.protocol.name must match the backend default",
);
let resource_metrics = tel.metrics();
let dp = $crate::common::find_duration_data_point(&resource_metrics)
.expect("db.client.operation.duration data point missing");
let actual_metric = $crate::common::metric_attr(&dp, "network.protocol.name");
assert_eq!(
actual_metric,
expected.map(|s| opentelemetry::Value::String(s.to_owned().into())),
"metric network.protocol.name must match the backend default",
);
}};
}
#[macro_export]
macro_rules! test_builder_with_network_protocol_name_overrides {
($raw_pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let _ = $dialect;
let tel = $crate::common::TestTelemetry::install();
let raw = $raw_pool_factory;
let pool = sqlx_otel::PoolBuilder::from(raw)
.with_network_protocol_name("custom-proto")
.build();
let _ = (&pool).fetch_optional("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
$crate::common::attr(&spans[0], "network.protocol.name"),
Some(opentelemetry::Value::String("custom-proto".into())),
"with_network_protocol_name must override the default on the span",
);
let resource_metrics = tel.metrics();
let dp = $crate::common::find_duration_data_point(&resource_metrics)
.expect("db.client.operation.duration data point missing");
assert_eq!(
$crate::common::metric_attr(&dp, "network.protocol.name"),
Some(opentelemetry::Value::String("custom-proto".into())),
"with_network_protocol_name must override the default on the metric",
);
}};
}
#[macro_export]
macro_rules! test_builder_with_network_transport {
($raw_pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let _ = $dialect;
let tel = $crate::common::TestTelemetry::install();
let raw = $raw_pool_factory;
let pool = sqlx_otel::PoolBuilder::from(raw)
.with_network_transport("tcp")
.build();
let _ = (&pool).fetch_optional("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
$crate::common::attr(&spans[0], "network.transport"),
Some(opentelemetry::Value::String("tcp".into())),
"span must carry network.transport set via with_network_transport",
);
let resource_metrics = tel.metrics();
let dp = $crate::common::find_duration_data_point(&resource_metrics)
.expect("db.client.operation.duration data point missing");
assert_eq!(
$crate::common::metric_attr(&dp, "network.transport"),
Some(opentelemetry::Value::String("tcp".into())),
"metric must carry network.transport set via with_network_transport",
);
}};
}
#[macro_export]
macro_rules! test_builder_with_network_peer_port {
($raw_pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let _ = $dialect;
let tel = $crate::common::TestTelemetry::install();
let raw = $raw_pool_factory;
let pool = sqlx_otel::PoolBuilder::from(raw)
.with_network_peer_port(5433)
.build();
let _ = (&pool).fetch_optional("SELECT 1").await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
$crate::common::attr(&spans[0], "network.peer.port"),
Some(opentelemetry::Value::I64(5433))
);
}};
}
#[macro_export]
macro_rules! test_pool_close_and_is_closed {
($pool_factory:expr, $dialect:expr) => {{
let _ = $dialect;
let _tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
assert!(!pool.is_closed());
pool.close().await;
assert!(pool.is_closed());
}};
}
#[macro_export]
macro_rules! test_query_text_mode_off_suppresses_sql {
($raw_pool_factory:expr, $dialect:expr) => {{
let tel = $crate::common::TestTelemetry::install();
let raw = $raw_pool_factory;
let pool = sqlx_otel::PoolBuilder::from(raw)
.with_query_text_mode(sqlx_otel::QueryTextMode::Off)
.build();
let _: Option<(i32,)> = sqlx::query_as("SELECT 1")
.fetch_optional(&pool)
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(spans[0].span_kind, opentelemetry::trace::SpanKind::Client);
assert_eq!(
$crate::common::attr(&spans[0], "db.system.name"),
Some(opentelemetry::Value::String($dialect.system.into()))
);
assert!($crate::common::attr(&spans[0], "db.namespace").is_some());
assert!(
$crate::common::attr(&spans[0], "db.query.text").is_none(),
"db.query.text should not be present when QueryTextMode::Off"
);
$crate::common::assert_metric_for_system(&tel, $dialect.system);
}};
}
#[macro_export]
macro_rules! test_execute_records_affected_rows {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
$crate::fresh_table!(
&pool,
"affected_test",
&format!("id {}, name {}", $dialect.id_pk_column, $dialect.text_column)
);
tel.reset();
(&pool)
.execute(
"INSERT INTO affected_test (id, name) VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')",
)
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.affected_rows"),
Some(opentelemetry::Value::I64(3)),
"inserting 3 rows should affect 3 rows"
);
$crate::common::assert_metric_for_system(&tel, $dialect.system);
$crate::common::assert_affected_rows_metric(&tel, $dialect.system);
tel.reset();
(&pool).execute($dialect.upsert_sql).await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.affected_rows"),
Some(opentelemetry::Value::I64($dialect.upsert_affected_rows)),
"upsert affected_rows differs per backend"
);
$crate::common::assert_affected_rows_metric(&tel, $dialect.system);
tel.reset();
(&pool)
.execute($dialect.string_concat_update_sql)
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.affected_rows"),
Some(opentelemetry::Value::I64(2)),
"updating two rows should affect 2 rows"
);
$crate::common::assert_affected_rows_metric(&tel, $dialect.system);
tel.reset();
(&pool)
.execute("DELETE FROM affected_test WHERE id IN (1, 2, 3)")
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.affected_rows"),
Some(opentelemetry::Value::I64(3)),
"deleting three rows should affect 3 rows"
);
$crate::common::assert_affected_rows_metric(&tel, $dialect.system);
tel.reset();
(&pool)
.execute("DELETE FROM affected_test WHERE id = 999")
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.affected_rows"),
Some(opentelemetry::Value::I64(0)),
"deleting non-existent rows should affect 0 rows"
);
$crate::common::assert_metric_for_system(&tel, $dialect.system);
$crate::common::assert_affected_rows_metric(&tel, $dialect.system);
}};
}
#[macro_export]
macro_rules! test_transaction_rollback {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let pool = $pool_factory;
let drop_sql = "DROP TABLE IF EXISTS rollback_test";
(&pool).execute(drop_sql).await.unwrap();
let tel = $crate::common::TestTelemetry::install();
let mut tx = pool.begin().await.unwrap();
let create_sql = format!("CREATE TABLE rollback_test (id {})", $dialect.id_pk_column);
(&mut tx).execute(create_sql.as_str()).await.unwrap();
tx.rollback().await.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
}};
}
#[macro_export]
macro_rules! test_query_text_mode_obfuscated_replaces_literals {
($raw_pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let _ = $dialect;
let raw = $raw_pool_factory;
let pool = sqlx_otel::PoolBuilder::from(raw)
.with_query_text_mode(sqlx_otel::QueryTextMode::Obfuscated)
.build();
let tel = $crate::common::TestTelemetry::install();
let _row = (&pool)
.fetch_optional("SELECT 1, 'alice', 3.14")
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
$crate::common::attr(&spans[0], "db.query.text"),
Some(opentelemetry::Value::String("SELECT ?, ?, ?".into()))
);
}};
}
#[macro_export]
macro_rules! test_query_text_mode_full_compacts_multiline_sql {
($raw_pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let _ = $dialect;
let raw = $raw_pool_factory;
let pool = sqlx_otel::PoolBuilder::from(raw).build();
let tel = $crate::common::TestTelemetry::install();
let _row = (&pool)
.fetch_optional("SELECT\n 1,\n 2,\n 3")
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
$crate::common::attr(&spans[0], "db.query.text"),
Some(opentelemetry::Value::String("SELECT 1, 2, 3".into())),
"multi-line SQL must be collapsed to single-space-separated tokens"
);
}};
}
#[macro_export]
macro_rules! test_query_text_mode_obfuscated_compacts_multiline_sql {
($raw_pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let _ = $dialect;
let raw = $raw_pool_factory;
let pool = sqlx_otel::PoolBuilder::from(raw)
.with_query_text_mode(sqlx_otel::QueryTextMode::Obfuscated)
.build();
let tel = $crate::common::TestTelemetry::install();
let _row = (&pool)
.fetch_optional("SELECT 1,\n 'alice',\n 3.14")
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
$crate::common::attr(&spans[0], "db.query.text"),
Some(opentelemetry::Value::String("SELECT ?, ?, ?".into())),
"Obfuscated mode must redact literals and collapse multi-line whitespace"
);
}};
}
#[macro_export]
macro_rules! test_fetch_optional_records_zero_rows {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Executor as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
$crate::fresh_table!(
&pool,
"empty_table",
&format!("id {}", $dialect.id_pk_column)
);
tel.reset();
let result = (&pool)
.fetch_optional("SELECT id FROM empty_table")
.await
.unwrap();
assert!(result.is_none());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_common_span_attributes(&spans[0], $dialect.system);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(0))
);
$crate::common::assert_metric_for_system(&tel, $dialect.system);
}};
}
#[macro_export]
macro_rules! test_query_bind_first_then_annotations_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Row as _;
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let row = sqlx::query($dialect.bind_two_sum_sql)
.bind(2_i32)
.bind(3_i32)
.with_annotations($crate::common::test_annotations())
.fetch_one(&pool)
.await
.unwrap();
let sum: i64 = row.try_get("sum").unwrap();
assert_eq!(sum, 5);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_annotations_first_then_bind_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx::Row as _;
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let row = sqlx::query($dialect.bind_two_sum_sql)
.with_annotations($crate::common::test_annotations())
.bind(10_i32)
.bind(20_i32)
.fetch_one(&pool)
.await
.unwrap();
let sum: i64 = row.try_get("sum").unwrap();
assert_eq!(sum, 30);
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_execute_with_annotations_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
sqlx::query("SELECT 1")
.with_annotations($crate::common::test_annotations())
.execute(&pool)
.await
.unwrap();
let pool_clone = pool.clone();
tokio::spawn(async move {
sqlx::query("SELECT 1")
.with_annotations($crate::common::test_annotations())
.execute(&pool_clone)
.await
.unwrap();
})
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 2);
for span in &spans {
$crate::common::assert_annotated_span(span, &$dialect);
assert!($crate::common::attr(span, "db.response.affected_rows").is_some());
}
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_execute_with_annotations_via_connection {
($pool_factory:expr, $dialect:expr) => {{
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut conn = pool.acquire().await.unwrap();
sqlx::query("SELECT 1")
.with_annotations($crate::common::test_annotations())
.execute(&mut conn)
.await
.unwrap();
drop(conn);
let pool_clone = pool.clone();
tokio::spawn(async move {
let mut conn = pool_clone.acquire().await.unwrap();
sqlx::query("SELECT 1")
.with_annotations($crate::common::test_annotations())
.execute(&mut conn)
.await
.unwrap();
})
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 2);
for span in &spans {
$crate::common::assert_annotated_span(span, &$dialect);
}
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_execute_with_annotations_via_transaction {
($pool_factory:expr, $dialect:expr) => {{
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let mut tx = pool.begin().await.unwrap();
sqlx::query("SELECT 1")
.with_annotations($crate::common::test_annotations())
.execute(&mut tx)
.await
.unwrap();
tx.commit().await.unwrap();
let pool_clone = pool.clone();
tokio::spawn(async move {
let mut tx = pool_clone.begin().await.unwrap();
sqlx::query("SELECT 1")
.with_annotations($crate::common::test_annotations())
.execute(&mut tx)
.await
.unwrap();
tx.commit().await.unwrap();
})
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 2);
for span in &spans {
$crate::common::assert_annotated_span(span, &$dialect);
}
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_with_operation_shorthand_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
sqlx::query("SELECT 1")
.with_operation("SELECT", "users")
.execute(&pool)
.await
.unwrap();
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_query_fetch_optional_with_annotations_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx_otel::QueryAnnotateExt as _;
let tel = $crate::common::TestTelemetry::install();
let pool = $pool_factory;
let row = sqlx::query("SELECT 1 WHERE 1 = 0")
.with_annotations($crate::common::test_annotations())
.fetch_optional(&pool)
.await
.unwrap();
assert!(row.is_none());
let spans = tel.spans();
assert_eq!(spans.len(), 1);
$crate::common::assert_annotated_span(&spans[0], &$dialect);
assert_eq!(
$crate::common::attr(&spans[0], "db.response.returned_rows"),
Some(opentelemetry::Value::I64(0))
);
$crate::common::assert_annotated_metric(&tel, &$dialect);
}};
}
#[macro_export]
macro_rules! test_executor_side_query_side_parity_via_pool {
($pool_factory:expr, $dialect:expr) => {{
use sqlx_otel::QueryAnnotateExt as _;
let _ = $dialect;
let pool = $pool_factory;
let tel = $crate::common::TestTelemetry::install();
let _ = sqlx::query("SELECT 1")
.execute(pool.with_operation("SELECT", "users"))
.await
.unwrap();
let exec_spans = tel.spans();
tel.reset();
let _ = sqlx::query("SELECT 1")
.with_operation("SELECT", "users")
.execute(&pool)
.await
.unwrap();
let query_spans = tel.spans();
tel.reset();
$crate::common::assert_span_parity("query::execute", &exec_spans, &query_spans);
let _: Option<(i32,)> = sqlx::query_as("SELECT 1")
.fetch_optional(pool.with_operation("SELECT", "users"))
.await
.unwrap();
let exec_spans = tel.spans();
tel.reset();
let _: Option<(i32,)> = sqlx::query_as("SELECT 1")
.with_operation("SELECT", "users")
.fetch_optional(&pool)
.await
.unwrap();
let query_spans = tel.spans();
tel.reset();
$crate::common::assert_span_parity("query_as::fetch_optional", &exec_spans, &query_spans);
let _: i32 = sqlx::query_scalar("SELECT 1")
.fetch_one(pool.with_operation("SELECT", "users"))
.await
.unwrap();
let exec_spans = tel.spans();
tel.reset();
let _: i32 = sqlx::query_scalar("SELECT 1")
.with_operation("SELECT", "users")
.fetch_one(&pool)
.await
.unwrap();
let query_spans = tel.spans();
$crate::common::assert_span_parity("query_scalar::fetch_one", &exec_spans, &query_spans);
}};
}