Skip to main content

sqlx_otel/
executor.rs

1use std::borrow::Cow;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::time::Instant;
5
6use futures::Stream;
7use futures::stream::BoxStream;
8use opentelemetry::trace::{SpanKind, Status, TraceContextExt, Tracer};
9use opentelemetry::{Context as OtelContext, KeyValue};
10use opentelemetry_semantic_conventions::attribute;
11
12use crate::annotations::QueryAnnotations;
13use crate::attributes::{self, ConnectionAttributes, QueryTextMode};
14use crate::database::Database;
15use crate::metrics::Metrics;
16
17// ---------------------------------------------------------------------------
18// Span helpers
19// ---------------------------------------------------------------------------
20
21/// Append the four per-query semantic convention annotation attributes
22/// (`db.operation.name`, `db.collection.name`, `db.query.summary`, `db.stored_procedure.name`) onto
23/// the supplied vector, one push per field that is `Some`. Used by both the span attribute builder
24/// and `begin_query_span`'s metric attribute list so the two emit identical annotation-derived
25/// keys.
26fn append_annotation_attrs(kv: &mut Vec<KeyValue>, annotations: Option<&QueryAnnotations>) {
27    let Some(ann) = annotations else { return };
28    if let Some(ref op) = ann.operation {
29        kv.push(KeyValue::new(attribute::DB_OPERATION_NAME, op.clone()));
30    }
31    if let Some(ref coll) = ann.collection {
32        kv.push(KeyValue::new(attribute::DB_COLLECTION_NAME, coll.clone()));
33    }
34    if let Some(ref summary) = ann.query_summary {
35        kv.push(KeyValue::new(attribute::DB_QUERY_SUMMARY, summary.clone()));
36    }
37    if let Some(ref sp) = ann.stored_procedure {
38        kv.push(KeyValue::new(
39            attribute::DB_STORED_PROCEDURE_NAME,
40            sp.clone(),
41        ));
42    }
43}
44
45/// Build span attributes for a query, combining connection-level and per-query values.
46///
47/// When `annotations` is provided, the four per-query semantic convention attributes
48/// (`db.operation.name`, `db.collection.name`, `db.query.summary`,
49/// `db.stored_procedure.name`) are included for any field that is set.
50fn build_attributes(
51    attrs: &ConnectionAttributes,
52    sql: Option<&str>,
53    annotations: Option<&QueryAnnotations>,
54) -> Vec<KeyValue> {
55    let mut kv = attrs.base_key_values();
56    append_annotation_attrs(&mut kv, annotations);
57    if let Some(sql) = sql {
58        match attrs.query_text_mode {
59            QueryTextMode::Full => {
60                kv.push(KeyValue::new(
61                    attribute::DB_QUERY_TEXT,
62                    crate::compact::compact_whitespace(sql),
63                ));
64            }
65            QueryTextMode::Obfuscated => {
66                let obfuscated = crate::obfuscate::obfuscate(sql);
67                kv.push(KeyValue::new(
68                    attribute::DB_QUERY_TEXT,
69                    crate::compact::compact_whitespace(&obfuscated),
70                ));
71            }
72            QueryTextMode::Off => {}
73        }
74    }
75    kv
76}
77
78/// Create an OpenTelemetry span for a database operation and return a context containing it.
79fn start_span(name: &str, span_attrs: Vec<KeyValue>) -> (OtelContext, Instant) {
80    let tracer = opentelemetry::global::tracer("sqlx-otel");
81    let span = tracer
82        .span_builder(name.to_owned())
83        .with_kind(SpanKind::Client)
84        .with_attributes(span_attrs)
85        .start(&tracer);
86    let cx = OtelContext::current_with_span(span);
87    (cx, Instant::now())
88}
89
90/// Start an instrumented query: derive the span name from the connection attributes and per-query
91/// annotations, build the span and metric attribute lists, and open the span.
92///
93/// Returns the span's context, the timing reference for `finish()`, and the metric attribute list.
94/// This consolidates the boilerplate that every `Executor` method shares before delegating to the
95/// inner `SQLx` call.
96///
97/// The returned `metric_attrs` mirror the bounded portion of the span attribute set: connection
98/// attributes plus the four annotation-derived attributes when present, plus error-path attributes
99/// (`error.type`, `db.response.status_code`) appended later by `record_error`. The unbounded
100/// `db.query.text` attribute is deliberately excluded; `db.query.summary` is caller-controlled and
101/// can be unbounded – that cardinality cost is inherited from the span side.
102fn begin_query_span(
103    attrs: &ConnectionAttributes,
104    sql: Option<&str>,
105    annotations: Option<&QueryAnnotations>,
106) -> (OtelContext, Instant, Vec<KeyValue>) {
107    let (op, coll, summary) = annotations.map_or((None, None, None), |a| {
108        (
109            a.operation.as_deref(),
110            a.collection.as_deref(),
111            a.query_summary.as_deref(),
112        )
113    });
114    let name = attributes::span_name(attrs.system, op, coll, summary);
115    let span_attrs = build_attributes(attrs, sql, annotations);
116    let mut metric_attrs = attrs.base_key_values();
117    append_annotation_attrs(&mut metric_attrs, annotations);
118    let (cx, start) = start_span(&name, span_attrs);
119    (cx, start, metric_attrs)
120}
121
122/// Classify a `sqlx::Error` variant into a string suitable for `error.type`.
123fn error_type(err: &sqlx::Error) -> &'static str {
124    match err {
125        sqlx::Error::Configuration(_) => "Configuration",
126        sqlx::Error::Database(_) => "Database",
127        sqlx::Error::Io(_) => "Io",
128        sqlx::Error::Tls(_) => "Tls",
129        sqlx::Error::Protocol(_) => "Protocol",
130        sqlx::Error::RowNotFound => "RowNotFound",
131        sqlx::Error::TypeNotFound { .. } => "TypeNotFound",
132        sqlx::Error::ColumnIndexOutOfBounds { .. } => "ColumnIndexOutOfBounds",
133        sqlx::Error::ColumnNotFound(_) => "ColumnNotFound",
134        sqlx::Error::ColumnDecode { .. } => "ColumnDecode",
135        sqlx::Error::Decode(_) => "Decode",
136        sqlx::Error::AnyDriverError(_) => "AnyDriverError",
137        sqlx::Error::PoolTimedOut => "PoolTimedOut",
138        sqlx::Error::PoolClosed => "PoolClosed",
139        sqlx::Error::WorkerCrashed => "WorkerCrashed",
140        sqlx::Error::Migrate(_) => "Migrate",
141        _ => "Unknown",
142    }
143}
144
145/// Record an error on the span within the given context: set status, `error.type`, and add an
146/// exception event. Also append `error.type` and `db.response.status_code` (SQLSTATE for
147/// `sqlx::Error::Database`) onto `metric_attrs` so the histogram emission carries the same
148/// error-path dimensions as the span. Single source of truth for `error_type(err)` and SQLSTATE
149/// extraction.
150fn record_error(cx: &OtelContext, err: &sqlx::Error, metric_attrs: &mut Vec<KeyValue>) {
151    let span = cx.span();
152    let kind = error_type(err);
153    span.set_status(Status::Error {
154        description: Cow::Owned(err.to_string()),
155    });
156    span.set_attribute(KeyValue::new(attribute::ERROR_TYPE, kind));
157    metric_attrs.push(KeyValue::new(attribute::ERROR_TYPE, kind));
158    // Extract SQLSTATE or database-specific error code when available.
159    if let sqlx::Error::Database(db_err) = err {
160        if let Some(code) = db_err.code() {
161            let code = code.into_owned();
162            span.set_attribute(KeyValue::new(
163                attribute::DB_RESPONSE_STATUS_CODE,
164                code.clone(),
165            ));
166            metric_attrs.push(KeyValue::new(attribute::DB_RESPONSE_STATUS_CODE, code));
167        }
168    }
169    span.add_event(
170        "exception",
171        vec![
172            KeyValue::new("exception.type", kind),
173            KeyValue::new("exception.message", err.to_string()),
174        ],
175    );
176}
177
178/// Record success attributes (returned rows) on the span.
179fn record_rows(cx: &OtelContext, rows: u64) {
180    cx.span().set_attribute(KeyValue::new(
181        attribute::DB_RESPONSE_RETURNED_ROWS,
182        i64::try_from(rows).unwrap_or(i64::MAX),
183    ));
184}
185
186/// Record affected rows on the span (for `execute` operations).
187fn record_affected_rows(cx: &OtelContext, rows: u64) {
188    cx.span().set_attribute(KeyValue::new(
189        "db.response.affected_rows",
190        i64::try_from(rows).unwrap_or(i64::MAX),
191    ));
192}
193
194/// End the span and record metrics. `returned_rows` is `Some` for `fetch*` paths,
195/// `affected_rows` is `Some` for `execute` paths; both are `None` for paths that report
196/// neither (e.g. `prepare` / `describe` / `execute_many`'s streaming aggregate).
197fn finish(
198    cx: &OtelContext,
199    start: Instant,
200    returned_rows: Option<u64>,
201    affected_rows: Option<u64>,
202    metrics: &Metrics,
203    attrs: &[KeyValue],
204) {
205    cx.span().end();
206    metrics.record(start.elapsed(), returned_rows, affected_rows, attrs);
207}
208
209/// Await a future, record any error on the span, then finish. Used by `execute`, `prepare`,
210/// `prepare_with`, and `describe` which share the same instrumentation pattern.
211async fn execute_instrumented<T>(
212    fut: futures::future::BoxFuture<'_, Result<T, sqlx::Error>>,
213    cx: OtelContext,
214    start: Instant,
215    metrics: std::sync::Arc<Metrics>,
216    mut metric_attrs: Vec<KeyValue>,
217) -> Result<T, sqlx::Error> {
218    let result = fut.await;
219    if let Err(err) = &result {
220        record_error(&cx, err, &mut metric_attrs);
221    }
222    finish(&cx, start, None, None, &metrics, &metric_attrs);
223    result
224}
225
226// ---------------------------------------------------------------------------
227// InstrumentedStream – keeps the span alive for streaming operations
228// ---------------------------------------------------------------------------
229
230/// Trait that determines how many rows a stream item represents.
231trait RowCounter<T> {
232    /// Return the number of rows this item contributes.
233    fn count(item: &T) -> u64;
234}
235
236/// Counts every item as one row. Used for `fetch` (which yields `Row`).
237struct CountAll;
238
239impl<T> RowCounter<T> for CountAll {
240    fn count(_item: &T) -> u64 {
241        1
242    }
243}
244
245/// Counts only `Either::Right` items as rows. Used for `fetch_many` (which yields
246/// `Either<QueryResult, Row>`).
247struct CountRight;
248
249impl<L, R> RowCounter<sqlx::Either<L, R>> for CountRight {
250    fn count(item: &sqlx::Either<L, R>) -> u64 {
251        u64::from(item.is_right())
252    }
253}
254
255/// Counts nothing. Used for `execute_many` (which yields `QueryResult`, not rows).
256struct CountNone;
257
258impl<T> RowCounter<T> for CountNone {
259    fn count(_item: &T) -> u64 {
260        0
261    }
262}
263
264/// A stream wrapper that holds an OpenTelemetry context (keeping the span alive), counts rows,
265/// and records metrics when the stream completes or is dropped.
266struct InstrumentedStream<S, C> {
267    inner: S,
268    cx: OtelContext,
269    start: Instant,
270    rows: u64,
271    metrics: std::sync::Arc<Metrics>,
272    metric_attrs: Vec<KeyValue>,
273    error_recorded: bool,
274    finished: bool,
275    _counter: std::marker::PhantomData<C>,
276}
277
278impl<S, C> InstrumentedStream<S, C> {
279    fn new(
280        inner: S,
281        cx: OtelContext,
282        start: Instant,
283        metrics: std::sync::Arc<Metrics>,
284        metric_attrs: Vec<KeyValue>,
285    ) -> Self {
286        Self {
287            inner,
288            cx,
289            start,
290            rows: 0,
291            metrics,
292            metric_attrs,
293            error_recorded: false,
294            finished: false,
295            _counter: std::marker::PhantomData,
296        }
297    }
298
299    fn complete(&mut self) {
300        if !self.finished {
301            self.finished = true;
302            record_rows(&self.cx, self.rows);
303            finish(
304                &self.cx,
305                self.start,
306                Some(self.rows),
307                None,
308                &self.metrics,
309                &self.metric_attrs,
310            );
311        }
312    }
313}
314
315// Safety: all fields are Unpin (inner S is bounded Unpin, the rest are owned values).
316// PhantomData<C> prevents auto-Unpin, so we impl it explicitly.
317impl<S: Unpin, C> Unpin for InstrumentedStream<S, C> {}
318
319impl<S, T, C> Stream for InstrumentedStream<S, C>
320where
321    S: Stream<Item = Result<T, sqlx::Error>> + Unpin,
322    C: RowCounter<T>,
323{
324    type Item = Result<T, sqlx::Error>;
325
326    fn poll_next(mut self: Pin<&mut Self>, task_cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
327        match Pin::new(&mut self.inner).poll_next(task_cx) {
328            Poll::Ready(Some(Ok(item))) => {
329                self.rows += C::count(&item);
330                Poll::Ready(Some(Ok(item)))
331            }
332            Poll::Ready(Some(Err(err))) => {
333                if !self.error_recorded {
334                    self.error_recorded = true;
335                    // Re-borrow `&mut *self` to split the disjoint-field borrow:
336                    // `record_error` needs `&self.cx` (immutable) and `&mut self.metric_attrs`
337                    // (mutable) simultaneously. Going through `Pin<&mut Self>::deref_mut`
338                    // (sound here because of the explicit `Unpin` impl below) lets the
339                    // borrow checker see the two fields as distinct.
340                    let this = &mut *self;
341                    record_error(&this.cx, &err, &mut this.metric_attrs);
342                }
343                Poll::Ready(Some(Err(err)))
344            }
345            Poll::Ready(None) => {
346                self.complete();
347                Poll::Ready(None)
348            }
349            Poll::Pending => Poll::Pending,
350        }
351    }
352}
353
354impl<S, C> Drop for InstrumentedStream<S, C> {
355    fn drop(&mut self) {
356        self.complete();
357    }
358}
359
360// ---------------------------------------------------------------------------
361// Macro to reduce Executor impl boilerplate
362// ---------------------------------------------------------------------------
363
364/// Generate the full `sqlx::Executor` implementation for one of our wrapper types.
365///
366/// Each method extracts the SQL string, builds an OpenTelemetry span with connection attributes,
367/// delegates to the inner executor, and records metrics and errors on completion.
368///
369/// Two forms are supported:
370/// - `impl_executor!(Type, self => inner)` – no annotations (passes `None`).
371/// - `impl_executor!(Type, self => inner, annotations: expr)` – per-query annotations.
372macro_rules! impl_executor {
373    ($ty:ty, $self_:ident => $inner:expr) => {
374        impl_executor!(@impl $ty, $self_ => $inner, None);
375    };
376    ($ty:ty, $self_:ident => $inner:expr, annotations: $ann:expr) => {
377        impl_executor!(@impl $ty, $self_ => $inner, $ann);
378    };
379    (@impl $ty:ty, $self_:ident => $inner:expr, $ann:expr) => {
380        impl<'c, DB> sqlx::Executor<'c> for $ty
381        where
382            DB: Database,
383            for<'a> &'a mut DB::Connection: sqlx::Executor<'a, Database = DB>,
384        {
385            type Database = DB;
386
387            /// Execute the query and return the total number of rows affected.
388            fn execute<'e, 'q: 'e, E>(
389                $self_,
390                query: E,
391            ) -> futures::future::BoxFuture<
392                'e,
393                Result<<DB as sqlx::Database>::QueryResult, sqlx::Error>,
394            >
395            where
396                E: 'q + sqlx::Execute<'q, DB>,
397                'c: 'e,
398            {
399                let sql = query.sql().to_owned();
400                let state = $self_.state.clone();
401                let (cx, start, mut metric_attrs) =
402                    begin_query_span(&state.attrs, Some(&sql), $ann);
403                let fut = ($inner).execute(query);
404                Box::pin(async move {
405                    let result = fut.await;
406                    let affected = match &result {
407                        Ok(qr) => {
408                            let n = DB::rows_affected(qr);
409                            record_affected_rows(&cx, n);
410                            Some(n)
411                        }
412                        Err(err) => {
413                            record_error(&cx, err, &mut metric_attrs);
414                            None
415                        }
416                    };
417                    finish(&cx, start, None, affected, &state.metrics, &metric_attrs);
418                    result
419                })
420            }
421
422            /// Execute multiple queries and return the rows affected from each query,
423            /// in a stream.
424            fn execute_many<'e, 'q: 'e, E>(
425                $self_,
426                query: E,
427            ) -> BoxStream<'e, Result<<DB as sqlx::Database>::QueryResult, sqlx::Error>>
428            where
429                E: 'q + sqlx::Execute<'q, DB>,
430                'c: 'e,
431            {
432                let sql = query.sql().to_owned();
433                let state = $self_.state.clone();
434                let (cx, start, metric_attrs) =
435                    begin_query_span(&state.attrs, Some(&sql), $ann);
436                let stream = ($inner).execute_many(query);
437                Box::pin(InstrumentedStream::<_, CountNone>::new(
438                    stream,
439                    cx,
440                    start,
441                    state.metrics,
442                    metric_attrs,
443                ))
444            }
445
446            /// Execute the query and return the generated results as a stream.
447            fn fetch<'e, 'q: 'e, E>(
448                $self_,
449                query: E,
450            ) -> BoxStream<'e, Result<<DB as sqlx::Database>::Row, sqlx::Error>>
451            where
452                E: 'q + sqlx::Execute<'q, DB>,
453                'c: 'e,
454            {
455                let sql = query.sql().to_owned();
456                let state = $self_.state.clone();
457                let (cx, start, metric_attrs) =
458                    begin_query_span(&state.attrs, Some(&sql), $ann);
459                let stream = ($inner).fetch(query);
460                Box::pin(InstrumentedStream::<_, CountAll>::new(
461                    stream,
462                    cx,
463                    start,
464                    state.metrics,
465                    metric_attrs,
466                ))
467            }
468
469            /// Execute multiple queries and return the generated results as a stream
470            /// from each query, in a stream.
471            fn fetch_many<'e, 'q: 'e, E>(
472                $self_,
473                query: E,
474            ) -> BoxStream<
475                'e,
476                Result<
477                    sqlx::Either<
478                        <DB as sqlx::Database>::QueryResult,
479                        <DB as sqlx::Database>::Row,
480                    >,
481                    sqlx::Error,
482                >,
483            >
484            where
485                E: 'q + sqlx::Execute<'q, DB>,
486                'c: 'e,
487            {
488                let sql = query.sql().to_owned();
489                let state = $self_.state.clone();
490                let (cx, start, metric_attrs) =
491                    begin_query_span(&state.attrs, Some(&sql), $ann);
492                let stream = ($inner).fetch_many(query);
493                Box::pin(InstrumentedStream::<_, CountRight>::new(
494                    stream,
495                    cx,
496                    start,
497                    state.metrics,
498                    metric_attrs,
499                ))
500            }
501
502            /// Execute the query and return all the generated results, collected into
503            /// a [`Vec`].
504            fn fetch_all<'e, 'q: 'e, E>(
505                $self_,
506                query: E,
507            ) -> futures::future::BoxFuture<
508                'e,
509                Result<Vec<<DB as sqlx::Database>::Row>, sqlx::Error>,
510            >
511            where
512                E: 'q + sqlx::Execute<'q, DB>,
513                'c: 'e,
514            {
515                let sql = query.sql().to_owned();
516                let state = $self_.state.clone();
517                let (cx, start, mut metric_attrs) =
518                    begin_query_span(&state.attrs, Some(&sql), $ann);
519                let fut = ($inner).fetch_all(query);
520                Box::pin(async move {
521                    let result = fut.await;
522                    match &result {
523                        Ok(rows) => {
524                            let count = rows.len() as u64;
525                            record_rows(&cx, count);
526                            finish(&cx, start, Some(count), None, &state.metrics, &metric_attrs);
527                        }
528                        Err(err) => {
529                            record_error(&cx, err, &mut metric_attrs);
530                            finish(&cx, start, None, None, &state.metrics, &metric_attrs);
531                        }
532                    }
533                    result
534                })
535            }
536
537            /// Execute the query and returns exactly one row.
538            fn fetch_one<'e, 'q: 'e, E>(
539                $self_,
540                query: E,
541            ) -> futures::future::BoxFuture<
542                'e,
543                Result<<DB as sqlx::Database>::Row, sqlx::Error>,
544            >
545            where
546                E: 'q + sqlx::Execute<'q, DB>,
547                'c: 'e,
548            {
549                let sql = query.sql().to_owned();
550                let state = $self_.state.clone();
551                let (cx, start, mut metric_attrs) =
552                    begin_query_span(&state.attrs, Some(&sql), $ann);
553                let fut = ($inner).fetch_one(query);
554                Box::pin(async move {
555                    let result = fut.await;
556                    match &result {
557                        Ok(_) => {
558                            record_rows(&cx, 1);
559                            finish(&cx, start, Some(1), None, &state.metrics, &metric_attrs);
560                        }
561                        Err(err) => {
562                            record_error(&cx, err, &mut metric_attrs);
563                            finish(&cx, start, None, None, &state.metrics, &metric_attrs);
564                        }
565                    }
566                    result
567                })
568            }
569
570            /// Execute the query and returns at most one row.
571            fn fetch_optional<'e, 'q: 'e, E>(
572                $self_,
573                query: E,
574            ) -> futures::future::BoxFuture<
575                'e,
576                Result<Option<<DB as sqlx::Database>::Row>, sqlx::Error>,
577            >
578            where
579                E: 'q + sqlx::Execute<'q, DB>,
580                'c: 'e,
581            {
582                let sql = query.sql().to_owned();
583                let state = $self_.state.clone();
584                let (cx, start, mut metric_attrs) =
585                    begin_query_span(&state.attrs, Some(&sql), $ann);
586                let fut = ($inner).fetch_optional(query);
587                Box::pin(async move {
588                    let result = fut.await;
589                    match &result {
590                        Ok(maybe_row) => {
591                            let count = u64::from(maybe_row.is_some());
592                            record_rows(&cx, count);
593                            finish(&cx, start, Some(count), None, &state.metrics, &metric_attrs);
594                        }
595                        Err(err) => {
596                            record_error(&cx, err, &mut metric_attrs);
597                            finish(&cx, start, None, None, &state.metrics, &metric_attrs);
598                        }
599                    }
600                    result
601                })
602            }
603
604            /// Prepare the SQL query to inspect the type information of its parameters
605            /// and results.
606            ///
607            /// Be advised that when using the `query`, `query_as`, or `query_scalar`
608            /// functions, the query is transparently prepared and executed.
609            ///
610            /// This explicit API is provided to allow access to the statement metadata
611            /// available after it prepared but before the first row is returned.
612            fn prepare<'e, 'q: 'e>(
613                $self_,
614                query: &'q str,
615            ) -> futures::future::BoxFuture<
616                'e,
617                Result<<DB as sqlx::Database>::Statement<'q>, sqlx::Error>,
618            >
619            where
620                'c: 'e,
621            {
622                let state = $self_.state.clone();
623                let (cx, start, metric_attrs) = begin_query_span(&state.attrs, Some(query), $ann);
624                let fut = ($inner).prepare(query);
625                Box::pin(execute_instrumented(
626                    fut, cx, start, state.metrics, metric_attrs,
627                ))
628            }
629
630            /// Prepare the SQL query, with parameter type information, to inspect the
631            /// type information about its parameters and results.
632            ///
633            /// Only some database drivers (Postgres, MSSQL) can take advantage of
634            /// this extra information to influence parameter type inference.
635            fn prepare_with<'e, 'q: 'e>(
636                $self_,
637                sql: &'q str,
638                parameters: &'e [<DB as sqlx::Database>::TypeInfo],
639            ) -> futures::future::BoxFuture<
640                'e,
641                Result<<DB as sqlx::Database>::Statement<'q>, sqlx::Error>,
642            >
643            where
644                'c: 'e,
645            {
646                let state = $self_.state.clone();
647                let (cx, start, metric_attrs) = begin_query_span(&state.attrs, Some(sql), $ann);
648                let fut = ($inner).prepare_with(sql, parameters);
649                Box::pin(execute_instrumented(
650                    fut, cx, start, state.metrics, metric_attrs,
651                ))
652            }
653
654            /// Describe the SQL query and return type information about its parameters
655            /// and results.
656            ///
657            /// This is used by compile-time verification in the query macros to
658            /// power their type inference.
659            #[doc(hidden)]
660            fn describe<'e, 'q: 'e>(
661                $self_,
662                sql: &'q str,
663            ) -> futures::future::BoxFuture<
664                'e,
665                Result<sqlx::Describe<DB>, sqlx::Error>,
666            >
667            where
668                'c: 'e,
669            {
670                let state = $self_.state.clone();
671                let (cx, start, metric_attrs) = begin_query_span(&state.attrs, Some(sql), $ann);
672                let fut = ($inner).describe(sql);
673                Box::pin(execute_instrumented(
674                    fut, cx, start, state.metrics, metric_attrs,
675                ))
676            }
677        }
678    };
679}
680
681// ---------------------------------------------------------------------------
682// Executor impls for each wrapper type
683// ---------------------------------------------------------------------------
684
685impl_executor!(&'_ crate::Pool<DB>, self => &self.inner);
686impl_executor!(&'c mut crate::PoolConnection<DB>, self => self.inner.as_mut());
687impl_executor!(&'c mut crate::Transaction<'_, DB>, self => &mut *self.inner);
688
689// Annotated wrappers – same instrumentation with per-query annotations threaded through.
690impl_executor!(
691    crate::annotations::Annotated<'c, crate::Pool<DB>>,
692    self => &self.inner.inner,
693    annotations: Some(&self.annotations)
694);
695impl_executor!(
696    crate::annotations::AnnotatedMut<'c, crate::PoolConnection<DB>>,
697    self => self.inner.inner.as_mut(),
698    annotations: Some(&self.annotations)
699);
700impl_executor!(
701    crate::annotations::AnnotatedMut<'c, crate::Transaction<'_, DB>>,
702    self => &mut *self.inner.inner,
703    annotations: Some(&self.annotations)
704);
705
706#[cfg(test)]
707mod tests {
708    use super::*;
709    use crate::attributes::ConnectionAttributes;
710
711    #[test]
712    fn error_type_classification() {
713        // Unit variants.
714        assert_eq!(error_type(&sqlx::Error::RowNotFound), "RowNotFound");
715        assert_eq!(error_type(&sqlx::Error::PoolTimedOut), "PoolTimedOut");
716        assert_eq!(error_type(&sqlx::Error::PoolClosed), "PoolClosed");
717        assert_eq!(error_type(&sqlx::Error::WorkerCrashed), "WorkerCrashed");
718
719        // String / boxed-error variants.
720        assert_eq!(
721            error_type(&sqlx::Error::Configuration("bad".into())),
722            "Configuration"
723        );
724        assert_eq!(
725            error_type(&sqlx::Error::Io(std::io::Error::other("test"))),
726            "Io"
727        );
728        assert_eq!(error_type(&sqlx::Error::Tls("tls".into())), "Tls");
729        assert_eq!(
730            error_type(&sqlx::Error::Protocol("proto".into())),
731            "Protocol"
732        );
733        assert_eq!(error_type(&sqlx::Error::Decode("dec".into())), "Decode");
734        assert_eq!(
735            error_type(&sqlx::Error::AnyDriverError("any".into())),
736            "AnyDriverError"
737        );
738
739        // Struct variants.
740        assert_eq!(
741            error_type(&sqlx::Error::ColumnNotFound("x".into())),
742            "ColumnNotFound"
743        );
744        assert_eq!(
745            error_type(&sqlx::Error::ColumnIndexOutOfBounds { index: 5, len: 3 }),
746            "ColumnIndexOutOfBounds"
747        );
748        assert_eq!(
749            error_type(&sqlx::Error::ColumnDecode {
750                index: "0".into(),
751                source: "bad".into(),
752            }),
753            "ColumnDecode"
754        );
755        assert_eq!(
756            error_type(&sqlx::Error::TypeNotFound {
757                type_name: "Foo".into(),
758            }),
759            "TypeNotFound"
760        );
761
762        // Migrate variant (behind sqlx's "migrate" default feature).
763        assert_eq!(
764            error_type(&sqlx::Error::Migrate(Box::new(
765                sqlx::migrate::MigrateError::Execute(sqlx::Error::Protocol("test".into()))
766            ))),
767            "Migrate"
768        );
769
770        // The `_ => "Unknown"` branch covers future sqlx::Error variants that may be
771        // added in newer sqlx releases. It cannot be tested directly since we cannot
772        // construct an unknown variant, but it ensures forward compatibility.
773    }
774
775    /// `InstrumentedStream::poll_next`'s `error_recorded` guard prevents `record_error`
776    /// from running more than once when the underlying stream yields multiple `Err`s
777    /// before terminating. Without the guard, the metric's attribute slice would
778    /// accumulate duplicate `error.type` (and `db.response.status_code`) `KeyValue`s,
779    /// producing a malformed histogram data point. Driven directly via a mock stream so
780    /// the assertion does not depend on backend stream-termination semantics.
781    #[test]
782    fn instrumented_stream_records_error_only_once_when_polled_past_err() {
783        use futures::StreamExt as _;
784        use futures::executor::block_on;
785        use futures::stream;
786
787        let metrics = std::sync::Arc::new(crate::metrics::Metrics::new());
788        let metric_attrs = vec![KeyValue::new(attribute::DB_SYSTEM_NAME, "postgresql")];
789        let (cx, start) = start_span("test", Vec::new());
790
791        // Yield two distinct `Err`s back-to-back, then `None`.
792        let inner = stream::iter(vec![
793            Err::<u64, _>(sqlx::Error::ColumnNotFound("x".into())),
794            Err(sqlx::Error::ColumnNotFound("y".into())),
795        ]);
796        let mut s = InstrumentedStream::<_, CountAll>::new(inner, cx, start, metrics, metric_attrs);
797
798        block_on(async {
799            assert!(matches!(s.next().await, Some(Err(_))), "expected first Err");
800            assert!(
801                matches!(s.next().await, Some(Err(_))),
802                "expected second Err"
803            );
804            assert!(s.next().await.is_none(), "expected stream to terminate");
805        });
806
807        let error_type_count = s
808            .metric_attrs
809            .iter()
810            .filter(|kv| kv.key.as_str() == "error.type")
811            .count();
812        assert_eq!(
813            error_type_count, 1,
814            "error.type must appear exactly once even when the stream yields multiple Err items",
815        );
816        assert!(
817            s.error_recorded,
818            "error_recorded should latch true after the first Err",
819        );
820    }
821
822    fn test_attrs() -> ConnectionAttributes {
823        ConnectionAttributes {
824            system: "postgresql",
825            host: Some("localhost".into()),
826            port: Some(5432),
827            namespace: Some("mydb".into()),
828            network_peer_address: None,
829            network_peer_port: None,
830            network_protocol_name: None,
831            network_transport: None,
832            pool_name: None,
833            query_text_mode: QueryTextMode::Full,
834        }
835    }
836
837    // ===========================================================================
838    // query text
839    // ===========================================================================
840
841    #[test]
842    fn build_attributes_with_full_query_text() {
843        let attrs = test_attrs();
844        let kv = build_attributes(&attrs, Some("SELECT 1"), None);
845        let keys: Vec<&str> = kv.iter().map(|k| k.key.as_str()).collect();
846        assert!(keys.contains(&"db.query.text"));
847    }
848
849    #[test]
850    fn build_attributes_with_off_query_text() {
851        let mut attrs = test_attrs();
852        attrs.query_text_mode = QueryTextMode::Off;
853        let kv = build_attributes(&attrs, Some("SELECT 1"), None);
854        let keys: Vec<&str> = kv.iter().map(|k| k.key.as_str()).collect();
855        assert!(!keys.contains(&"db.query.text"));
856    }
857
858    #[test]
859    fn build_attributes_obfuscated_replaces_literals() {
860        let mut attrs = test_attrs();
861        attrs.query_text_mode = QueryTextMode::Obfuscated;
862        let kv = build_attributes(
863            &attrs,
864            Some("INSERT INTO t (id, name) VALUES (1, 'alice')"),
865            None,
866        );
867        let text = kv
868            .iter()
869            .find(|k| k.key.as_str() == "db.query.text")
870            .map(|k| k.value.clone());
871        assert_eq!(
872            text,
873            Some(opentelemetry::Value::String(
874                "INSERT INTO t (id, name) VALUES (?, ?)".into()
875            ))
876        );
877    }
878
879    // ===========================================================================
880    // annotations
881    // ===========================================================================
882
883    #[test]
884    fn build_attributes_no_sql_no_annotations() {
885        let attrs = test_attrs();
886        let kv = build_attributes(&attrs, None, None);
887        let keys: Vec<&str> = kv.iter().map(|k| k.key.as_str()).collect();
888        assert!(!keys.contains(&"db.query.text"));
889        assert!(!keys.contains(&"db.operation.name"));
890        assert!(!keys.contains(&"db.collection.name"));
891        assert!(!keys.contains(&"db.query.summary"));
892        assert!(!keys.contains(&"db.stored_procedure.name"));
893        assert!(keys.contains(&"db.system.name"));
894    }
895
896    #[test]
897    fn build_attributes_with_all_annotation_fields() {
898        let attrs = test_attrs();
899        let ann = QueryAnnotations::new()
900            .operation("SELECT")
901            .collection("users")
902            .query_summary("SELECT users")
903            .stored_procedure("sp_get");
904        let kv = build_attributes(&attrs, Some("SELECT * FROM users"), Some(&ann));
905        let find = |key: &str| {
906            kv.iter()
907                .find(|k| k.key.as_str() == key)
908                .map(|k| k.value.clone())
909        };
910        assert_eq!(
911            find("db.operation.name"),
912            Some(opentelemetry::Value::String("SELECT".into()))
913        );
914        assert_eq!(
915            find("db.collection.name"),
916            Some(opentelemetry::Value::String("users".into()))
917        );
918        assert_eq!(
919            find("db.query.summary"),
920            Some(opentelemetry::Value::String("SELECT users".into()))
921        );
922        assert_eq!(
923            find("db.stored_procedure.name"),
924            Some(opentelemetry::Value::String("sp_get".into()))
925        );
926        assert_eq!(
927            find("db.query.text"),
928            Some(opentelemetry::Value::String("SELECT * FROM users".into()))
929        );
930    }
931
932    #[test]
933    fn append_annotation_attrs_pushes_all_four_when_set() {
934        let ann = QueryAnnotations::new()
935            .operation("SELECT")
936            .collection("users")
937            .query_summary("users by id")
938            .stored_procedure("sp_get_users");
939        let mut kv = Vec::new();
940        append_annotation_attrs(&mut kv, Some(&ann));
941        let pairs: Vec<(&str, &opentelemetry::Value)> =
942            kv.iter().map(|k| (k.key.as_str(), &k.value)).collect();
943        assert_eq!(pairs.len(), 4, "expected one push per annotation field");
944        assert!(pairs.contains(&(
945            "db.operation.name",
946            &opentelemetry::Value::String("SELECT".into())
947        )));
948        assert!(pairs.contains(&(
949            "db.collection.name",
950            &opentelemetry::Value::String("users".into())
951        )));
952        assert!(pairs.contains(&(
953            "db.query.summary",
954            &opentelemetry::Value::String("users by id".into())
955        )));
956        assert!(pairs.contains(&(
957            "db.stored_procedure.name",
958            &opentelemetry::Value::String("sp_get_users".into())
959        )));
960    }
961
962    #[test]
963    fn append_annotation_attrs_none_pushes_nothing() {
964        let mut kv = Vec::new();
965        append_annotation_attrs(&mut kv, None);
966        assert!(kv.is_empty(), "no pushes expected when annotations is None");
967    }
968
969    #[test]
970    fn append_annotation_attrs_default_pushes_nothing() {
971        let mut kv = Vec::new();
972        append_annotation_attrs(&mut kv, Some(&QueryAnnotations::new()));
973        assert!(
974            kv.is_empty(),
975            "no pushes expected when every annotation field is None"
976        );
977    }
978
979    #[test]
980    fn build_attributes_annotation_field_permutations() {
981        type Setter = fn(QueryAnnotations) -> QueryAnnotations;
982
983        let attrs = test_attrs();
984        let fields: &[(&str, Setter)] = &[
985            ("db.operation.name", |a| a.operation("SELECT")),
986            ("db.collection.name", |a| a.collection("users")),
987            ("db.query.summary", |a| a.query_summary("SELECT users")),
988            ("db.stored_procedure.name", |a| a.stored_procedure("sp")),
989        ];
990
991        // Verify every permutation (2^4 = 16) of the four annotation fields: each field that is
992        // `Some` must appear in the output, and each field that is `None` must be absent.
993        for mask in 0u8..16 {
994            let mut ann = QueryAnnotations::new();
995            for (i, &(_, setter)) in fields.iter().enumerate() {
996                if mask & (1 << i) != 0 {
997                    ann = setter(ann);
998                }
999            }
1000            let kv = build_attributes(&attrs, None, Some(&ann));
1001            let keys: Vec<&str> = kv.iter().map(|k| k.key.as_str()).collect();
1002            for (i, &(key, _)) in fields.iter().enumerate() {
1003                println!(
1004                    "mask: {:08b}, field: {}, key: {}; contains: {}",
1005                    mask,
1006                    i,
1007                    key,
1008                    keys.contains(&key)
1009                );
1010                if mask & (1 << i) != 0 {
1011                    assert!(
1012                        keys.contains(&key),
1013                        "{key} should be present for mask {mask:#06b}"
1014                    );
1015                } else {
1016                    assert!(
1017                        !keys.contains(&key),
1018                        "{key} should be absent for mask {mask:#06b}"
1019                    );
1020                }
1021            }
1022        }
1023    }
1024
1025    use proptest::prelude::*;
1026
1027    /// Build a `ConnectionAttributes` from explicit option fields. Used by the proptest
1028    /// strategies below so that each generated case exercises an arbitrary subset of the
1029    /// optional connection-level fields.
1030    fn make_connection_attributes(
1031        host: Option<String>,
1032        port: Option<u16>,
1033        namespace: Option<String>,
1034        network_peer_address: Option<String>,
1035        network_peer_port: Option<u16>,
1036        query_text_mode: QueryTextMode,
1037    ) -> ConnectionAttributes {
1038        ConnectionAttributes {
1039            system: "postgresql",
1040            host,
1041            port,
1042            namespace,
1043            network_peer_address,
1044            network_peer_port,
1045            network_protocol_name: None,
1046            network_transport: None,
1047            pool_name: None,
1048            query_text_mode,
1049        }
1050    }
1051
1052    /// Strategy for the three `QueryTextMode` variants.
1053    fn any_query_text_mode() -> impl Strategy<Value = QueryTextMode> {
1054        prop_oneof![
1055            Just(QueryTextMode::Full),
1056            Just(QueryTextMode::Obfuscated),
1057            Just(QueryTextMode::Off),
1058        ]
1059    }
1060
1061    /// Sentinel embedded inside marked literals for the chain no-leak proptest. Mirrors
1062    /// the constant in `obfuscate::tests::proptests` so a single failure mode (a literal
1063    /// kind escaping redaction) is detected through both the standalone `obfuscate`
1064    /// invariants and the executor-level chain invariants. The chain generators below
1065    /// intentionally duplicate the token shapes from `obfuscate::tests::proptests` and
1066    /// `compact::tests::proptests`; if a token shape needs adjusting, mirror the change
1067    /// in all three modules so the chain invariants stay honest.
1068    const CHAIN_SENTINEL: &str = "XSECRETX";
1069
1070    /// Minimal fragment generator for the chain proptests: covers the token kinds whose
1071    /// composition through `obfuscate -> compact_whitespace` exercises every region of
1072    /// both state machines. Bodies are alphabetic-and-digit so the sentinel cannot
1073    /// accidentally appear in a non-literal token.
1074    fn chain_fragment_any() -> impl Strategy<Value = String> {
1075        let token = prop_oneof![
1076            "[a-z_][a-z0-9_]{0,7}".prop_map(String::from),
1077            "[ \t\n]{0,5}".prop_map(String::from),
1078            "[a-z0-9 _]{0,8}".prop_map(|inner| format!("'{inner}'")),
1079            "[a-z0-9 _]{0,8}".prop_map(|inner| format!("\"{inner}\"")),
1080            (
1081                "[a-z_]{0,3}".prop_map(String::from),
1082                "[a-z0-9 _]{0,8}".prop_map(String::from),
1083            )
1084                .prop_map(|(tag, body)| format!("${tag}${body}${tag}$")),
1085            "[a-z0-9 _]{0,12}".prop_map(|inner| format!("--{inner}\n")),
1086            "[a-z0-9 _]{0,12}".prop_map(|inner| format!("/*{inner}*/")),
1087            "[0-9]{1,5}".prop_map(String::from),
1088            prop::sample::select(vec![",", ";", "=", "(", ")", "+", "*", "?"])
1089                .prop_map(String::from),
1090        ];
1091        prop::collection::vec(token, 0..12).prop_map(|tokens| tokens.concat())
1092    }
1093
1094    /// Marked-literal fragment generator: every string and dollar-quoted body embeds the
1095    /// sentinel. Surrounding tokens never contain the sentinel because their bodies are
1096    /// lowercase-only. If any literal kind is not redacted by `obfuscate`, the sentinel
1097    /// leaks through to the chain output.
1098    fn chain_fragment_marked() -> impl Strategy<Value = String> {
1099        let token = prop_oneof![
1100            "[a-z_][a-z0-9_]{0,7}".prop_map(String::from),
1101            "[ \t\n]{0,5}".prop_map(String::from),
1102            Just(format!("'{CHAIN_SENTINEL}'")),
1103            "[a-z_]{0,3}".prop_map(|tag| format!("${tag}${CHAIN_SENTINEL}${tag}$")),
1104            prop::sample::select(vec![",", ";", "=", "(", ")"]).prop_map(String::from),
1105        ];
1106        prop::collection::vec(token, 0..10).prop_map(|tokens| tokens.concat())
1107    }
1108
1109    /// Strategy for an arbitrary `QueryAnnotations` whose four fields are independently
1110    /// `None` or `Some(s)` for a bounded-length string `s`.
1111    fn any_annotations() -> impl Strategy<Value = QueryAnnotations> {
1112        (
1113            proptest::option::of(".{0,32}"),
1114            proptest::option::of(".{0,32}"),
1115            proptest::option::of(".{0,32}"),
1116            proptest::option::of(".{0,32}"),
1117        )
1118            .prop_map(|(op, coll, summary, sp)| {
1119                let mut ann = QueryAnnotations::new();
1120                if let Some(s) = op {
1121                    ann = ann.operation(s);
1122                }
1123                if let Some(s) = coll {
1124                    ann = ann.collection(s);
1125                }
1126                if let Some(s) = summary {
1127                    ann = ann.query_summary(s);
1128                }
1129                if let Some(s) = sp {
1130                    ann = ann.stored_procedure(s);
1131                }
1132                ann
1133            })
1134    }
1135
1136    proptest! {
1137        #![proptest_config(ProptestConfig::with_cases(128))]
1138
1139        /// Membership invariant: the keys emitted by `build_attributes` are exactly the
1140        /// union of the base connection keys, the four annotation keys (each iff its
1141        /// field is `Some`), and `db.query.text` (iff `sql.is_some()` and the mode is
1142        /// not `Off`).
1143        #[test]
1144        fn build_attributes_membership_invariant(
1145            host in proptest::option::of("[a-z]{1,16}"),
1146            port in proptest::option::of(any::<u16>()),
1147            namespace in proptest::option::of("[a-z]{1,16}"),
1148            network_peer_address in proptest::option::of("[0-9.:]{1,32}"),
1149            network_peer_port in proptest::option::of(any::<u16>()),
1150            mode in any_query_text_mode(),
1151            sql in proptest::option::of(".{0,64}"),
1152            ann in any_annotations(),
1153        ) {
1154            let attrs = make_connection_attributes(
1155                host.clone(), port, namespace.clone(),
1156                network_peer_address.clone(), network_peer_port, mode,
1157            );
1158            let kv = build_attributes(&attrs, sql.as_deref(), Some(&ann));
1159            let keys: Vec<&str> = kv.iter().map(|k| k.key.as_str()).collect();
1160
1161            // `db.system.name` is always present.
1162            prop_assert!(keys.contains(&"db.system.name"));
1163
1164            // Optional connection keys appear iff their field is `Some`.
1165            prop_assert_eq!(keys.contains(&"server.address"), host.is_some());
1166            prop_assert_eq!(keys.contains(&"server.port"), port.is_some());
1167            prop_assert_eq!(keys.contains(&"db.namespace"), namespace.is_some());
1168            prop_assert_eq!(keys.contains(&"network.peer.address"), network_peer_address.is_some());
1169            prop_assert_eq!(keys.contains(&"network.peer.port"), network_peer_port.is_some());
1170
1171            // Annotation keys appear iff their field is `Some`.
1172            prop_assert_eq!(keys.contains(&"db.operation.name"), ann.operation.is_some());
1173            prop_assert_eq!(keys.contains(&"db.collection.name"), ann.collection.is_some());
1174            prop_assert_eq!(keys.contains(&"db.query.summary"), ann.query_summary.is_some());
1175            prop_assert_eq!(keys.contains(&"db.stored_procedure.name"), ann.stored_procedure.is_some());
1176
1177            // `db.query.text` is emitted iff sql is provided and mode is not Off.
1178            let expect_query_text = sql.is_some() && mode != QueryTextMode::Off;
1179            prop_assert_eq!(keys.contains(&"db.query.text"), expect_query_text);
1180        }
1181
1182        /// No key appears more than once in the emitted attribute list. Duplicate keys
1183        /// would cause downstream OTel exporters to emit conflicting tag values.
1184        #[test]
1185        fn build_attributes_has_no_duplicate_keys(
1186            host in proptest::option::of("[a-z]{1,16}"),
1187            port in proptest::option::of(any::<u16>()),
1188            namespace in proptest::option::of("[a-z]{1,16}"),
1189            mode in any_query_text_mode(),
1190            sql in proptest::option::of(".{0,64}"),
1191            ann in any_annotations(),
1192        ) {
1193            let attrs = make_connection_attributes(host, port, namespace, None, None, mode);
1194            let kv = build_attributes(&attrs, sql.as_deref(), Some(&ann));
1195            let mut seen = std::collections::HashSet::new();
1196            for k in &kv {
1197                prop_assert!(
1198                    seen.insert(k.key.as_str().to_owned()),
1199                    "duplicate key in build_attributes output: {}",
1200                    k.key.as_str(),
1201                );
1202            }
1203        }
1204
1205        /// `build_attributes` does not panic on arbitrary unicode SQL across all three
1206        /// query-text modes, including the obfuscated path that delegates into
1207        /// `obfuscate::obfuscate`.
1208        #[test]
1209        fn build_attributes_no_panic_arbitrary_sql(
1210            sql in proptest::option::of(any::<String>()),
1211            mode in any_query_text_mode(),
1212            ann in any_annotations(),
1213        ) {
1214            let attrs = make_connection_attributes(None, None, None, None, None, mode);
1215            let _ = build_attributes(&attrs, sql.as_deref(), Some(&ann));
1216        }
1217
1218        /// When `annotations` is `None`, no annotation keys appear in the output
1219        /// regardless of any other input – the `if let Some(ann)` guard short-circuits
1220        /// the entire annotation-emission block.
1221        #[test]
1222        fn build_attributes_no_annotations_emits_no_annotation_keys(
1223            mode in any_query_text_mode(),
1224            sql in proptest::option::of(".{0,64}"),
1225        ) {
1226            let attrs = make_connection_attributes(None, None, None, None, None, mode);
1227            let kv = build_attributes(&attrs, sql.as_deref(), None);
1228            let keys: Vec<&str> = kv.iter().map(|k| k.key.as_str()).collect();
1229            prop_assert!(!keys.contains(&"db.operation.name"));
1230            prop_assert!(!keys.contains(&"db.collection.name"));
1231            prop_assert!(!keys.contains(&"db.query.summary"));
1232            prop_assert!(!keys.contains(&"db.stored_procedure.name"));
1233        }
1234
1235        /// Chain idempotence for the `Obfuscated` arm pipeline:
1236        /// `compact_whitespace(obfuscate(s))` is a fixed point. Both passes are
1237        /// individually idempotent (proven in their own modules); their composition must
1238        /// also be – running the chain twice produces the same string as running it once.
1239        #[test]
1240        fn chain_compact_obfuscate_idempotent(s in chain_fragment_any()) {
1241            let f = |x: &str| crate::compact::compact_whitespace(&crate::obfuscate::obfuscate(x));
1242            let once = f(&s);
1243            let twice = f(&once);
1244            prop_assert_eq!(once, twice);
1245        }
1246
1247        /// No-leak through chain: every literal in the input embeds the sentinel
1248        /// `XSECRETX`. After `compact_whitespace(obfuscate(s))`, the sentinel must be
1249        /// gone – otherwise some literal kind escaped redaction or the compaction step
1250        /// introduced a path that re-exposed redacted bytes.
1251        #[test]
1252        fn chain_compact_obfuscate_no_leak(s in chain_fragment_marked()) {
1253            let f = |x: &str| crate::compact::compact_whitespace(&crate::obfuscate::obfuscate(x));
1254            let out = f(&s);
1255            prop_assert!(
1256                !out.contains("XSECRETX"),
1257                "sentinel leaked through chain: input={s:?} output={out:?}"
1258            );
1259        }
1260
1261        /// Trim invariant on the emitted `db.query.text`: for both `Full` and
1262        /// `Obfuscated` modes, the captured value never starts or ends with `' '`.
1263        /// Trailing `'\n'` is permitted (line-comment terminator); only `' '` is
1264        /// forbidden as a leading or trailing byte.
1265        #[test]
1266        fn chain_emitted_query_text_trim_invariant(
1267            sql in any::<String>(),
1268            mode in prop_oneof![
1269                Just(QueryTextMode::Full),
1270                Just(QueryTextMode::Obfuscated),
1271            ],
1272        ) {
1273            let attrs = make_connection_attributes(None, None, None, None, None, mode);
1274            let kv = build_attributes(&attrs, Some(&sql), None);
1275            let value = kv
1276                .iter()
1277                .find(|k| k.key.as_str() == "db.query.text")
1278                .map(|k| k.value.clone());
1279            // For Full/Obfuscated with sql=Some, db.query.text must be present. If the
1280            // key disappears or its value type drifts away from String, the assertions
1281            // below would silently pass – fail loudly instead so a future regression in
1282            // the dispatch site is caught.
1283            let value = value.expect("db.query.text must be emitted for Full/Obfuscated");
1284            let opentelemetry::Value::String(s) = value else {
1285                panic!("db.query.text must be a String value, got {value:?}");
1286            };
1287            let s = s.as_str();
1288            prop_assert!(!s.starts_with(' '), "leading space in db.query.text: {s:?}");
1289            prop_assert!(!s.ends_with(' '), "trailing space in db.query.text: {s:?}");
1290        }
1291
1292        /// `append_annotation_attrs` membership invariant: starting from an empty vector,
1293        /// the appended key set is exactly `{"db.operation.name" iff op.is_some(),
1294        /// "db.collection.name" iff coll.is_some(), "db.query.summary" iff
1295        /// query_summary.is_some(), "db.stored_procedure.name" iff
1296        /// stored_procedure.is_some()}` – and nothing else, in particular none of the
1297        /// connection or query-text keys leak through.
1298        #[test]
1299        fn append_annotation_attrs_membership_invariant(ann in any_annotations()) {
1300            let mut kv = Vec::new();
1301            append_annotation_attrs(&mut kv, Some(&ann));
1302            let keys: Vec<&str> = kv.iter().map(|k| k.key.as_str()).collect();
1303
1304            prop_assert_eq!(keys.contains(&"db.operation.name"), ann.operation.is_some());
1305            prop_assert_eq!(keys.contains(&"db.collection.name"), ann.collection.is_some());
1306            prop_assert_eq!(keys.contains(&"db.query.summary"), ann.query_summary.is_some());
1307            prop_assert_eq!(
1308                keys.contains(&"db.stored_procedure.name"),
1309                ann.stored_procedure.is_some(),
1310            );
1311
1312            // No connection or query-text keys leak in from a stray copy-paste of
1313            // `build_attributes` semantics.
1314            prop_assert!(!keys.contains(&"db.system.name"));
1315            prop_assert!(!keys.contains(&"db.namespace"));
1316            prop_assert!(!keys.contains(&"db.query.text"));
1317
1318            // Cardinality matches the count of `Some` annotation fields.
1319            let expected_count = usize::from(ann.operation.is_some())
1320                + usize::from(ann.collection.is_some())
1321                + usize::from(ann.query_summary.is_some())
1322                + usize::from(ann.stored_procedure.is_some());
1323            prop_assert_eq!(kv.len(), expected_count);
1324        }
1325    }
1326}