1use std::borrow::Cow;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::time::Instant;
5
6use futures::Stream;
7use futures::stream::BoxStream;
8use opentelemetry::trace::{SpanKind, Status, TraceContextExt, Tracer};
9use opentelemetry::{Context as OtelContext, KeyValue};
10use opentelemetry_semantic_conventions::attribute;
11
12use crate::annotations::QueryAnnotations;
13use crate::attributes::{self, ConnectionAttributes, QueryTextMode};
14use crate::database::Database;
15use crate::metrics::Metrics;
16
17fn append_annotation_attrs(kv: &mut Vec<KeyValue>, annotations: Option<&QueryAnnotations>) {
27 let Some(ann) = annotations else { return };
28 if let Some(ref op) = ann.operation {
29 kv.push(KeyValue::new(attribute::DB_OPERATION_NAME, op.clone()));
30 }
31 if let Some(ref coll) = ann.collection {
32 kv.push(KeyValue::new(attribute::DB_COLLECTION_NAME, coll.clone()));
33 }
34 if let Some(ref summary) = ann.query_summary {
35 kv.push(KeyValue::new(attribute::DB_QUERY_SUMMARY, summary.clone()));
36 }
37 if let Some(ref sp) = ann.stored_procedure {
38 kv.push(KeyValue::new(
39 attribute::DB_STORED_PROCEDURE_NAME,
40 sp.clone(),
41 ));
42 }
43}
44
45fn build_attributes(
51 attrs: &ConnectionAttributes,
52 sql: Option<&str>,
53 annotations: Option<&QueryAnnotations>,
54) -> Vec<KeyValue> {
55 let mut kv = attrs.base_key_values();
56 append_annotation_attrs(&mut kv, annotations);
57 if let Some(sql) = sql {
58 match attrs.query_text_mode {
59 QueryTextMode::Full => {
60 kv.push(KeyValue::new(
61 attribute::DB_QUERY_TEXT,
62 crate::compact::compact_whitespace(sql),
63 ));
64 }
65 QueryTextMode::Obfuscated => {
66 let obfuscated = crate::obfuscate::obfuscate(sql);
67 kv.push(KeyValue::new(
68 attribute::DB_QUERY_TEXT,
69 crate::compact::compact_whitespace(&obfuscated),
70 ));
71 }
72 QueryTextMode::Off => {}
73 }
74 }
75 kv
76}
77
78fn start_span(name: &str, span_attrs: Vec<KeyValue>) -> (OtelContext, Instant) {
80 let tracer = opentelemetry::global::tracer("sqlx-otel");
81 let span = tracer
82 .span_builder(name.to_owned())
83 .with_kind(SpanKind::Client)
84 .with_attributes(span_attrs)
85 .start(&tracer);
86 let cx = OtelContext::current_with_span(span);
87 (cx, Instant::now())
88}
89
90fn begin_query_span(
103 attrs: &ConnectionAttributes,
104 sql: Option<&str>,
105 annotations: Option<&QueryAnnotations>,
106) -> (OtelContext, Instant, Vec<KeyValue>) {
107 let (op, coll, summary) = annotations.map_or((None, None, None), |a| {
108 (
109 a.operation.as_deref(),
110 a.collection.as_deref(),
111 a.query_summary.as_deref(),
112 )
113 });
114 let name = attributes::span_name(attrs.system, op, coll, summary);
115 let span_attrs = build_attributes(attrs, sql, annotations);
116 let mut metric_attrs = attrs.base_key_values();
117 append_annotation_attrs(&mut metric_attrs, annotations);
118 let (cx, start) = start_span(&name, span_attrs);
119 (cx, start, metric_attrs)
120}
121
122fn error_type(err: &sqlx::Error) -> &'static str {
124 match err {
125 sqlx::Error::Configuration(_) => "Configuration",
126 sqlx::Error::Database(_) => "Database",
127 sqlx::Error::Io(_) => "Io",
128 sqlx::Error::Tls(_) => "Tls",
129 sqlx::Error::Protocol(_) => "Protocol",
130 sqlx::Error::RowNotFound => "RowNotFound",
131 sqlx::Error::TypeNotFound { .. } => "TypeNotFound",
132 sqlx::Error::ColumnIndexOutOfBounds { .. } => "ColumnIndexOutOfBounds",
133 sqlx::Error::ColumnNotFound(_) => "ColumnNotFound",
134 sqlx::Error::ColumnDecode { .. } => "ColumnDecode",
135 sqlx::Error::Decode(_) => "Decode",
136 sqlx::Error::AnyDriverError(_) => "AnyDriverError",
137 sqlx::Error::PoolTimedOut => "PoolTimedOut",
138 sqlx::Error::PoolClosed => "PoolClosed",
139 sqlx::Error::WorkerCrashed => "WorkerCrashed",
140 sqlx::Error::Migrate(_) => "Migrate",
141 _ => "Unknown",
142 }
143}
144
145fn record_error(cx: &OtelContext, err: &sqlx::Error, metric_attrs: &mut Vec<KeyValue>) {
151 let span = cx.span();
152 let kind = error_type(err);
153 span.set_status(Status::Error {
154 description: Cow::Owned(err.to_string()),
155 });
156 span.set_attribute(KeyValue::new(attribute::ERROR_TYPE, kind));
157 metric_attrs.push(KeyValue::new(attribute::ERROR_TYPE, kind));
158 if let sqlx::Error::Database(db_err) = err {
160 if let Some(code) = db_err.code() {
161 let code = code.into_owned();
162 span.set_attribute(KeyValue::new(
163 attribute::DB_RESPONSE_STATUS_CODE,
164 code.clone(),
165 ));
166 metric_attrs.push(KeyValue::new(attribute::DB_RESPONSE_STATUS_CODE, code));
167 }
168 }
169 span.add_event(
170 "exception",
171 vec![
172 KeyValue::new("exception.type", kind),
173 KeyValue::new("exception.message", err.to_string()),
174 ],
175 );
176}
177
178fn record_rows(cx: &OtelContext, rows: u64) {
180 cx.span().set_attribute(KeyValue::new(
181 attribute::DB_RESPONSE_RETURNED_ROWS,
182 i64::try_from(rows).unwrap_or(i64::MAX),
183 ));
184}
185
186fn record_affected_rows(cx: &OtelContext, rows: u64) {
188 cx.span().set_attribute(KeyValue::new(
189 "db.response.affected_rows",
190 i64::try_from(rows).unwrap_or(i64::MAX),
191 ));
192}
193
194fn finish(
198 cx: &OtelContext,
199 start: Instant,
200 returned_rows: Option<u64>,
201 affected_rows: Option<u64>,
202 metrics: &Metrics,
203 attrs: &[KeyValue],
204) {
205 cx.span().end();
206 metrics.record(start.elapsed(), returned_rows, affected_rows, attrs);
207}
208
209async fn execute_instrumented<T>(
212 fut: futures::future::BoxFuture<'_, Result<T, sqlx::Error>>,
213 cx: OtelContext,
214 start: Instant,
215 metrics: std::sync::Arc<Metrics>,
216 mut metric_attrs: Vec<KeyValue>,
217) -> Result<T, sqlx::Error> {
218 let result = fut.await;
219 if let Err(err) = &result {
220 record_error(&cx, err, &mut metric_attrs);
221 }
222 finish(&cx, start, None, None, &metrics, &metric_attrs);
223 result
224}
225
226trait RowCounter<T> {
232 fn count(item: &T) -> u64;
234}
235
236struct CountAll;
238
239impl<T> RowCounter<T> for CountAll {
240 fn count(_item: &T) -> u64 {
241 1
242 }
243}
244
245struct CountRight;
248
249impl<L, R> RowCounter<sqlx::Either<L, R>> for CountRight {
250 fn count(item: &sqlx::Either<L, R>) -> u64 {
251 u64::from(item.is_right())
252 }
253}
254
255struct CountNone;
257
258impl<T> RowCounter<T> for CountNone {
259 fn count(_item: &T) -> u64 {
260 0
261 }
262}
263
264struct InstrumentedStream<S, C> {
267 inner: S,
268 cx: OtelContext,
269 start: Instant,
270 rows: u64,
271 metrics: std::sync::Arc<Metrics>,
272 metric_attrs: Vec<KeyValue>,
273 error_recorded: bool,
274 finished: bool,
275 _counter: std::marker::PhantomData<C>,
276}
277
278impl<S, C> InstrumentedStream<S, C> {
279 fn new(
280 inner: S,
281 cx: OtelContext,
282 start: Instant,
283 metrics: std::sync::Arc<Metrics>,
284 metric_attrs: Vec<KeyValue>,
285 ) -> Self {
286 Self {
287 inner,
288 cx,
289 start,
290 rows: 0,
291 metrics,
292 metric_attrs,
293 error_recorded: false,
294 finished: false,
295 _counter: std::marker::PhantomData,
296 }
297 }
298
299 fn complete(&mut self) {
300 if !self.finished {
301 self.finished = true;
302 record_rows(&self.cx, self.rows);
303 finish(
304 &self.cx,
305 self.start,
306 Some(self.rows),
307 None,
308 &self.metrics,
309 &self.metric_attrs,
310 );
311 }
312 }
313}
314
315impl<S: Unpin, C> Unpin for InstrumentedStream<S, C> {}
318
319impl<S, T, C> Stream for InstrumentedStream<S, C>
320where
321 S: Stream<Item = Result<T, sqlx::Error>> + Unpin,
322 C: RowCounter<T>,
323{
324 type Item = Result<T, sqlx::Error>;
325
326 fn poll_next(mut self: Pin<&mut Self>, task_cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
327 match Pin::new(&mut self.inner).poll_next(task_cx) {
328 Poll::Ready(Some(Ok(item))) => {
329 self.rows += C::count(&item);
330 Poll::Ready(Some(Ok(item)))
331 }
332 Poll::Ready(Some(Err(err))) => {
333 if !self.error_recorded {
334 self.error_recorded = true;
335 let this = &mut *self;
341 record_error(&this.cx, &err, &mut this.metric_attrs);
342 }
343 Poll::Ready(Some(Err(err)))
344 }
345 Poll::Ready(None) => {
346 self.complete();
347 Poll::Ready(None)
348 }
349 Poll::Pending => Poll::Pending,
350 }
351 }
352}
353
354impl<S, C> Drop for InstrumentedStream<S, C> {
355 fn drop(&mut self) {
356 self.complete();
357 }
358}
359
360macro_rules! impl_executor {
373 ($ty:ty, $self_:ident => $inner:expr) => {
374 impl_executor!(@impl $ty, $self_ => $inner, None);
375 };
376 ($ty:ty, $self_:ident => $inner:expr, annotations: $ann:expr) => {
377 impl_executor!(@impl $ty, $self_ => $inner, $ann);
378 };
379 (@impl $ty:ty, $self_:ident => $inner:expr, $ann:expr) => {
380 impl<'c, DB> sqlx::Executor<'c> for $ty
381 where
382 DB: Database,
383 for<'a> &'a mut DB::Connection: sqlx::Executor<'a, Database = DB>,
384 {
385 type Database = DB;
386
387 fn execute<'e, 'q: 'e, E>(
389 $self_,
390 query: E,
391 ) -> futures::future::BoxFuture<
392 'e,
393 Result<<DB as sqlx::Database>::QueryResult, sqlx::Error>,
394 >
395 where
396 E: 'q + sqlx::Execute<'q, DB>,
397 'c: 'e,
398 {
399 let sql = query.sql().to_owned();
400 let state = $self_.state.clone();
401 let (cx, start, mut metric_attrs) =
402 begin_query_span(&state.attrs, Some(&sql), $ann);
403 let fut = ($inner).execute(query);
404 Box::pin(async move {
405 let result = fut.await;
406 let affected = match &result {
407 Ok(qr) => {
408 let n = DB::rows_affected(qr);
409 record_affected_rows(&cx, n);
410 Some(n)
411 }
412 Err(err) => {
413 record_error(&cx, err, &mut metric_attrs);
414 None
415 }
416 };
417 finish(&cx, start, None, affected, &state.metrics, &metric_attrs);
418 result
419 })
420 }
421
422 fn execute_many<'e, 'q: 'e, E>(
425 $self_,
426 query: E,
427 ) -> BoxStream<'e, Result<<DB as sqlx::Database>::QueryResult, sqlx::Error>>
428 where
429 E: 'q + sqlx::Execute<'q, DB>,
430 'c: 'e,
431 {
432 let sql = query.sql().to_owned();
433 let state = $self_.state.clone();
434 let (cx, start, metric_attrs) =
435 begin_query_span(&state.attrs, Some(&sql), $ann);
436 let stream = ($inner).execute_many(query);
437 Box::pin(InstrumentedStream::<_, CountNone>::new(
438 stream,
439 cx,
440 start,
441 state.metrics,
442 metric_attrs,
443 ))
444 }
445
446 fn fetch<'e, 'q: 'e, E>(
448 $self_,
449 query: E,
450 ) -> BoxStream<'e, Result<<DB as sqlx::Database>::Row, sqlx::Error>>
451 where
452 E: 'q + sqlx::Execute<'q, DB>,
453 'c: 'e,
454 {
455 let sql = query.sql().to_owned();
456 let state = $self_.state.clone();
457 let (cx, start, metric_attrs) =
458 begin_query_span(&state.attrs, Some(&sql), $ann);
459 let stream = ($inner).fetch(query);
460 Box::pin(InstrumentedStream::<_, CountAll>::new(
461 stream,
462 cx,
463 start,
464 state.metrics,
465 metric_attrs,
466 ))
467 }
468
469 fn fetch_many<'e, 'q: 'e, E>(
472 $self_,
473 query: E,
474 ) -> BoxStream<
475 'e,
476 Result<
477 sqlx::Either<
478 <DB as sqlx::Database>::QueryResult,
479 <DB as sqlx::Database>::Row,
480 >,
481 sqlx::Error,
482 >,
483 >
484 where
485 E: 'q + sqlx::Execute<'q, DB>,
486 'c: 'e,
487 {
488 let sql = query.sql().to_owned();
489 let state = $self_.state.clone();
490 let (cx, start, metric_attrs) =
491 begin_query_span(&state.attrs, Some(&sql), $ann);
492 let stream = ($inner).fetch_many(query);
493 Box::pin(InstrumentedStream::<_, CountRight>::new(
494 stream,
495 cx,
496 start,
497 state.metrics,
498 metric_attrs,
499 ))
500 }
501
502 fn fetch_all<'e, 'q: 'e, E>(
505 $self_,
506 query: E,
507 ) -> futures::future::BoxFuture<
508 'e,
509 Result<Vec<<DB as sqlx::Database>::Row>, sqlx::Error>,
510 >
511 where
512 E: 'q + sqlx::Execute<'q, DB>,
513 'c: 'e,
514 {
515 let sql = query.sql().to_owned();
516 let state = $self_.state.clone();
517 let (cx, start, mut metric_attrs) =
518 begin_query_span(&state.attrs, Some(&sql), $ann);
519 let fut = ($inner).fetch_all(query);
520 Box::pin(async move {
521 let result = fut.await;
522 match &result {
523 Ok(rows) => {
524 let count = rows.len() as u64;
525 record_rows(&cx, count);
526 finish(&cx, start, Some(count), None, &state.metrics, &metric_attrs);
527 }
528 Err(err) => {
529 record_error(&cx, err, &mut metric_attrs);
530 finish(&cx, start, None, None, &state.metrics, &metric_attrs);
531 }
532 }
533 result
534 })
535 }
536
537 fn fetch_one<'e, 'q: 'e, E>(
539 $self_,
540 query: E,
541 ) -> futures::future::BoxFuture<
542 'e,
543 Result<<DB as sqlx::Database>::Row, sqlx::Error>,
544 >
545 where
546 E: 'q + sqlx::Execute<'q, DB>,
547 'c: 'e,
548 {
549 let sql = query.sql().to_owned();
550 let state = $self_.state.clone();
551 let (cx, start, mut metric_attrs) =
552 begin_query_span(&state.attrs, Some(&sql), $ann);
553 let fut = ($inner).fetch_one(query);
554 Box::pin(async move {
555 let result = fut.await;
556 match &result {
557 Ok(_) => {
558 record_rows(&cx, 1);
559 finish(&cx, start, Some(1), None, &state.metrics, &metric_attrs);
560 }
561 Err(err) => {
562 record_error(&cx, err, &mut metric_attrs);
563 finish(&cx, start, None, None, &state.metrics, &metric_attrs);
564 }
565 }
566 result
567 })
568 }
569
570 fn fetch_optional<'e, 'q: 'e, E>(
572 $self_,
573 query: E,
574 ) -> futures::future::BoxFuture<
575 'e,
576 Result<Option<<DB as sqlx::Database>::Row>, sqlx::Error>,
577 >
578 where
579 E: 'q + sqlx::Execute<'q, DB>,
580 'c: 'e,
581 {
582 let sql = query.sql().to_owned();
583 let state = $self_.state.clone();
584 let (cx, start, mut metric_attrs) =
585 begin_query_span(&state.attrs, Some(&sql), $ann);
586 let fut = ($inner).fetch_optional(query);
587 Box::pin(async move {
588 let result = fut.await;
589 match &result {
590 Ok(maybe_row) => {
591 let count = u64::from(maybe_row.is_some());
592 record_rows(&cx, count);
593 finish(&cx, start, Some(count), None, &state.metrics, &metric_attrs);
594 }
595 Err(err) => {
596 record_error(&cx, err, &mut metric_attrs);
597 finish(&cx, start, None, None, &state.metrics, &metric_attrs);
598 }
599 }
600 result
601 })
602 }
603
604 fn prepare<'e, 'q: 'e>(
613 $self_,
614 query: &'q str,
615 ) -> futures::future::BoxFuture<
616 'e,
617 Result<<DB as sqlx::Database>::Statement<'q>, sqlx::Error>,
618 >
619 where
620 'c: 'e,
621 {
622 let state = $self_.state.clone();
623 let (cx, start, metric_attrs) = begin_query_span(&state.attrs, Some(query), $ann);
624 let fut = ($inner).prepare(query);
625 Box::pin(execute_instrumented(
626 fut, cx, start, state.metrics, metric_attrs,
627 ))
628 }
629
630 fn prepare_with<'e, 'q: 'e>(
636 $self_,
637 sql: &'q str,
638 parameters: &'e [<DB as sqlx::Database>::TypeInfo],
639 ) -> futures::future::BoxFuture<
640 'e,
641 Result<<DB as sqlx::Database>::Statement<'q>, sqlx::Error>,
642 >
643 where
644 'c: 'e,
645 {
646 let state = $self_.state.clone();
647 let (cx, start, metric_attrs) = begin_query_span(&state.attrs, Some(sql), $ann);
648 let fut = ($inner).prepare_with(sql, parameters);
649 Box::pin(execute_instrumented(
650 fut, cx, start, state.metrics, metric_attrs,
651 ))
652 }
653
654 #[doc(hidden)]
660 fn describe<'e, 'q: 'e>(
661 $self_,
662 sql: &'q str,
663 ) -> futures::future::BoxFuture<
664 'e,
665 Result<sqlx::Describe<DB>, sqlx::Error>,
666 >
667 where
668 'c: 'e,
669 {
670 let state = $self_.state.clone();
671 let (cx, start, metric_attrs) = begin_query_span(&state.attrs, Some(sql), $ann);
672 let fut = ($inner).describe(sql);
673 Box::pin(execute_instrumented(
674 fut, cx, start, state.metrics, metric_attrs,
675 ))
676 }
677 }
678 };
679}
680
681impl_executor!(&'_ crate::Pool<DB>, self => &self.inner);
686impl_executor!(&'c mut crate::PoolConnection<DB>, self => self.inner.as_mut());
687impl_executor!(&'c mut crate::Transaction<'_, DB>, self => &mut *self.inner);
688
689impl_executor!(
691 crate::annotations::Annotated<'c, crate::Pool<DB>>,
692 self => &self.inner.inner,
693 annotations: Some(&self.annotations)
694);
695impl_executor!(
696 crate::annotations::AnnotatedMut<'c, crate::PoolConnection<DB>>,
697 self => self.inner.inner.as_mut(),
698 annotations: Some(&self.annotations)
699);
700impl_executor!(
701 crate::annotations::AnnotatedMut<'c, crate::Transaction<'_, DB>>,
702 self => &mut *self.inner.inner,
703 annotations: Some(&self.annotations)
704);
705
706#[cfg(test)]
707mod tests {
708 use super::*;
709 use crate::attributes::ConnectionAttributes;
710
711 #[test]
712 fn error_type_classification() {
713 assert_eq!(error_type(&sqlx::Error::RowNotFound), "RowNotFound");
715 assert_eq!(error_type(&sqlx::Error::PoolTimedOut), "PoolTimedOut");
716 assert_eq!(error_type(&sqlx::Error::PoolClosed), "PoolClosed");
717 assert_eq!(error_type(&sqlx::Error::WorkerCrashed), "WorkerCrashed");
718
719 assert_eq!(
721 error_type(&sqlx::Error::Configuration("bad".into())),
722 "Configuration"
723 );
724 assert_eq!(
725 error_type(&sqlx::Error::Io(std::io::Error::other("test"))),
726 "Io"
727 );
728 assert_eq!(error_type(&sqlx::Error::Tls("tls".into())), "Tls");
729 assert_eq!(
730 error_type(&sqlx::Error::Protocol("proto".into())),
731 "Protocol"
732 );
733 assert_eq!(error_type(&sqlx::Error::Decode("dec".into())), "Decode");
734 assert_eq!(
735 error_type(&sqlx::Error::AnyDriverError("any".into())),
736 "AnyDriverError"
737 );
738
739 assert_eq!(
741 error_type(&sqlx::Error::ColumnNotFound("x".into())),
742 "ColumnNotFound"
743 );
744 assert_eq!(
745 error_type(&sqlx::Error::ColumnIndexOutOfBounds { index: 5, len: 3 }),
746 "ColumnIndexOutOfBounds"
747 );
748 assert_eq!(
749 error_type(&sqlx::Error::ColumnDecode {
750 index: "0".into(),
751 source: "bad".into(),
752 }),
753 "ColumnDecode"
754 );
755 assert_eq!(
756 error_type(&sqlx::Error::TypeNotFound {
757 type_name: "Foo".into(),
758 }),
759 "TypeNotFound"
760 );
761
762 assert_eq!(
764 error_type(&sqlx::Error::Migrate(Box::new(
765 sqlx::migrate::MigrateError::Execute(sqlx::Error::Protocol("test".into()))
766 ))),
767 "Migrate"
768 );
769
770 }
774
775 #[test]
782 fn instrumented_stream_records_error_only_once_when_polled_past_err() {
783 use futures::StreamExt as _;
784 use futures::executor::block_on;
785 use futures::stream;
786
787 let metrics = std::sync::Arc::new(crate::metrics::Metrics::new());
788 let metric_attrs = vec![KeyValue::new(attribute::DB_SYSTEM_NAME, "postgresql")];
789 let (cx, start) = start_span("test", Vec::new());
790
791 let inner = stream::iter(vec![
793 Err::<u64, _>(sqlx::Error::ColumnNotFound("x".into())),
794 Err(sqlx::Error::ColumnNotFound("y".into())),
795 ]);
796 let mut s = InstrumentedStream::<_, CountAll>::new(inner, cx, start, metrics, metric_attrs);
797
798 block_on(async {
799 assert!(matches!(s.next().await, Some(Err(_))), "expected first Err");
800 assert!(
801 matches!(s.next().await, Some(Err(_))),
802 "expected second Err"
803 );
804 assert!(s.next().await.is_none(), "expected stream to terminate");
805 });
806
807 let error_type_count = s
808 .metric_attrs
809 .iter()
810 .filter(|kv| kv.key.as_str() == "error.type")
811 .count();
812 assert_eq!(
813 error_type_count, 1,
814 "error.type must appear exactly once even when the stream yields multiple Err items",
815 );
816 assert!(
817 s.error_recorded,
818 "error_recorded should latch true after the first Err",
819 );
820 }
821
822 fn test_attrs() -> ConnectionAttributes {
823 ConnectionAttributes {
824 system: "postgresql",
825 host: Some("localhost".into()),
826 port: Some(5432),
827 namespace: Some("mydb".into()),
828 network_peer_address: None,
829 network_peer_port: None,
830 network_protocol_name: None,
831 network_transport: None,
832 pool_name: None,
833 query_text_mode: QueryTextMode::Full,
834 }
835 }
836
837 #[test]
842 fn build_attributes_with_full_query_text() {
843 let attrs = test_attrs();
844 let kv = build_attributes(&attrs, Some("SELECT 1"), None);
845 let keys: Vec<&str> = kv.iter().map(|k| k.key.as_str()).collect();
846 assert!(keys.contains(&"db.query.text"));
847 }
848
849 #[test]
850 fn build_attributes_with_off_query_text() {
851 let mut attrs = test_attrs();
852 attrs.query_text_mode = QueryTextMode::Off;
853 let kv = build_attributes(&attrs, Some("SELECT 1"), None);
854 let keys: Vec<&str> = kv.iter().map(|k| k.key.as_str()).collect();
855 assert!(!keys.contains(&"db.query.text"));
856 }
857
858 #[test]
859 fn build_attributes_obfuscated_replaces_literals() {
860 let mut attrs = test_attrs();
861 attrs.query_text_mode = QueryTextMode::Obfuscated;
862 let kv = build_attributes(
863 &attrs,
864 Some("INSERT INTO t (id, name) VALUES (1, 'alice')"),
865 None,
866 );
867 let text = kv
868 .iter()
869 .find(|k| k.key.as_str() == "db.query.text")
870 .map(|k| k.value.clone());
871 assert_eq!(
872 text,
873 Some(opentelemetry::Value::String(
874 "INSERT INTO t (id, name) VALUES (?, ?)".into()
875 ))
876 );
877 }
878
879 #[test]
884 fn build_attributes_no_sql_no_annotations() {
885 let attrs = test_attrs();
886 let kv = build_attributes(&attrs, None, None);
887 let keys: Vec<&str> = kv.iter().map(|k| k.key.as_str()).collect();
888 assert!(!keys.contains(&"db.query.text"));
889 assert!(!keys.contains(&"db.operation.name"));
890 assert!(!keys.contains(&"db.collection.name"));
891 assert!(!keys.contains(&"db.query.summary"));
892 assert!(!keys.contains(&"db.stored_procedure.name"));
893 assert!(keys.contains(&"db.system.name"));
894 }
895
896 #[test]
897 fn build_attributes_with_all_annotation_fields() {
898 let attrs = test_attrs();
899 let ann = QueryAnnotations::new()
900 .operation("SELECT")
901 .collection("users")
902 .query_summary("SELECT users")
903 .stored_procedure("sp_get");
904 let kv = build_attributes(&attrs, Some("SELECT * FROM users"), Some(&ann));
905 let find = |key: &str| {
906 kv.iter()
907 .find(|k| k.key.as_str() == key)
908 .map(|k| k.value.clone())
909 };
910 assert_eq!(
911 find("db.operation.name"),
912 Some(opentelemetry::Value::String("SELECT".into()))
913 );
914 assert_eq!(
915 find("db.collection.name"),
916 Some(opentelemetry::Value::String("users".into()))
917 );
918 assert_eq!(
919 find("db.query.summary"),
920 Some(opentelemetry::Value::String("SELECT users".into()))
921 );
922 assert_eq!(
923 find("db.stored_procedure.name"),
924 Some(opentelemetry::Value::String("sp_get".into()))
925 );
926 assert_eq!(
927 find("db.query.text"),
928 Some(opentelemetry::Value::String("SELECT * FROM users".into()))
929 );
930 }
931
932 #[test]
933 fn append_annotation_attrs_pushes_all_four_when_set() {
934 let ann = QueryAnnotations::new()
935 .operation("SELECT")
936 .collection("users")
937 .query_summary("users by id")
938 .stored_procedure("sp_get_users");
939 let mut kv = Vec::new();
940 append_annotation_attrs(&mut kv, Some(&ann));
941 let pairs: Vec<(&str, &opentelemetry::Value)> =
942 kv.iter().map(|k| (k.key.as_str(), &k.value)).collect();
943 assert_eq!(pairs.len(), 4, "expected one push per annotation field");
944 assert!(pairs.contains(&(
945 "db.operation.name",
946 &opentelemetry::Value::String("SELECT".into())
947 )));
948 assert!(pairs.contains(&(
949 "db.collection.name",
950 &opentelemetry::Value::String("users".into())
951 )));
952 assert!(pairs.contains(&(
953 "db.query.summary",
954 &opentelemetry::Value::String("users by id".into())
955 )));
956 assert!(pairs.contains(&(
957 "db.stored_procedure.name",
958 &opentelemetry::Value::String("sp_get_users".into())
959 )));
960 }
961
962 #[test]
963 fn append_annotation_attrs_none_pushes_nothing() {
964 let mut kv = Vec::new();
965 append_annotation_attrs(&mut kv, None);
966 assert!(kv.is_empty(), "no pushes expected when annotations is None");
967 }
968
969 #[test]
970 fn append_annotation_attrs_default_pushes_nothing() {
971 let mut kv = Vec::new();
972 append_annotation_attrs(&mut kv, Some(&QueryAnnotations::new()));
973 assert!(
974 kv.is_empty(),
975 "no pushes expected when every annotation field is None"
976 );
977 }
978
979 #[test]
980 fn build_attributes_annotation_field_permutations() {
981 type Setter = fn(QueryAnnotations) -> QueryAnnotations;
982
983 let attrs = test_attrs();
984 let fields: &[(&str, Setter)] = &[
985 ("db.operation.name", |a| a.operation("SELECT")),
986 ("db.collection.name", |a| a.collection("users")),
987 ("db.query.summary", |a| a.query_summary("SELECT users")),
988 ("db.stored_procedure.name", |a| a.stored_procedure("sp")),
989 ];
990
991 for mask in 0u8..16 {
994 let mut ann = QueryAnnotations::new();
995 for (i, &(_, setter)) in fields.iter().enumerate() {
996 if mask & (1 << i) != 0 {
997 ann = setter(ann);
998 }
999 }
1000 let kv = build_attributes(&attrs, None, Some(&ann));
1001 let keys: Vec<&str> = kv.iter().map(|k| k.key.as_str()).collect();
1002 for (i, &(key, _)) in fields.iter().enumerate() {
1003 println!(
1004 "mask: {:08b}, field: {}, key: {}; contains: {}",
1005 mask,
1006 i,
1007 key,
1008 keys.contains(&key)
1009 );
1010 if mask & (1 << i) != 0 {
1011 assert!(
1012 keys.contains(&key),
1013 "{key} should be present for mask {mask:#06b}"
1014 );
1015 } else {
1016 assert!(
1017 !keys.contains(&key),
1018 "{key} should be absent for mask {mask:#06b}"
1019 );
1020 }
1021 }
1022 }
1023 }
1024
1025 use proptest::prelude::*;
1026
1027 fn make_connection_attributes(
1031 host: Option<String>,
1032 port: Option<u16>,
1033 namespace: Option<String>,
1034 network_peer_address: Option<String>,
1035 network_peer_port: Option<u16>,
1036 query_text_mode: QueryTextMode,
1037 ) -> ConnectionAttributes {
1038 ConnectionAttributes {
1039 system: "postgresql",
1040 host,
1041 port,
1042 namespace,
1043 network_peer_address,
1044 network_peer_port,
1045 network_protocol_name: None,
1046 network_transport: None,
1047 pool_name: None,
1048 query_text_mode,
1049 }
1050 }
1051
1052 fn any_query_text_mode() -> impl Strategy<Value = QueryTextMode> {
1054 prop_oneof![
1055 Just(QueryTextMode::Full),
1056 Just(QueryTextMode::Obfuscated),
1057 Just(QueryTextMode::Off),
1058 ]
1059 }
1060
1061 const CHAIN_SENTINEL: &str = "XSECRETX";
1069
1070 fn chain_fragment_any() -> impl Strategy<Value = String> {
1075 let token = prop_oneof![
1076 "[a-z_][a-z0-9_]{0,7}".prop_map(String::from),
1077 "[ \t\n]{0,5}".prop_map(String::from),
1078 "[a-z0-9 _]{0,8}".prop_map(|inner| format!("'{inner}'")),
1079 "[a-z0-9 _]{0,8}".prop_map(|inner| format!("\"{inner}\"")),
1080 (
1081 "[a-z_]{0,3}".prop_map(String::from),
1082 "[a-z0-9 _]{0,8}".prop_map(String::from),
1083 )
1084 .prop_map(|(tag, body)| format!("${tag}${body}${tag}$")),
1085 "[a-z0-9 _]{0,12}".prop_map(|inner| format!("--{inner}\n")),
1086 "[a-z0-9 _]{0,12}".prop_map(|inner| format!("/*{inner}*/")),
1087 "[0-9]{1,5}".prop_map(String::from),
1088 prop::sample::select(vec![",", ";", "=", "(", ")", "+", "*", "?"])
1089 .prop_map(String::from),
1090 ];
1091 prop::collection::vec(token, 0..12).prop_map(|tokens| tokens.concat())
1092 }
1093
1094 fn chain_fragment_marked() -> impl Strategy<Value = String> {
1099 let token = prop_oneof![
1100 "[a-z_][a-z0-9_]{0,7}".prop_map(String::from),
1101 "[ \t\n]{0,5}".prop_map(String::from),
1102 Just(format!("'{CHAIN_SENTINEL}'")),
1103 "[a-z_]{0,3}".prop_map(|tag| format!("${tag}${CHAIN_SENTINEL}${tag}$")),
1104 prop::sample::select(vec![",", ";", "=", "(", ")"]).prop_map(String::from),
1105 ];
1106 prop::collection::vec(token, 0..10).prop_map(|tokens| tokens.concat())
1107 }
1108
1109 fn any_annotations() -> impl Strategy<Value = QueryAnnotations> {
1112 (
1113 proptest::option::of(".{0,32}"),
1114 proptest::option::of(".{0,32}"),
1115 proptest::option::of(".{0,32}"),
1116 proptest::option::of(".{0,32}"),
1117 )
1118 .prop_map(|(op, coll, summary, sp)| {
1119 let mut ann = QueryAnnotations::new();
1120 if let Some(s) = op {
1121 ann = ann.operation(s);
1122 }
1123 if let Some(s) = coll {
1124 ann = ann.collection(s);
1125 }
1126 if let Some(s) = summary {
1127 ann = ann.query_summary(s);
1128 }
1129 if let Some(s) = sp {
1130 ann = ann.stored_procedure(s);
1131 }
1132 ann
1133 })
1134 }
1135
1136 proptest! {
1137 #![proptest_config(ProptestConfig::with_cases(128))]
1138
1139 #[test]
1144 fn build_attributes_membership_invariant(
1145 host in proptest::option::of("[a-z]{1,16}"),
1146 port in proptest::option::of(any::<u16>()),
1147 namespace in proptest::option::of("[a-z]{1,16}"),
1148 network_peer_address in proptest::option::of("[0-9.:]{1,32}"),
1149 network_peer_port in proptest::option::of(any::<u16>()),
1150 mode in any_query_text_mode(),
1151 sql in proptest::option::of(".{0,64}"),
1152 ann in any_annotations(),
1153 ) {
1154 let attrs = make_connection_attributes(
1155 host.clone(), port, namespace.clone(),
1156 network_peer_address.clone(), network_peer_port, mode,
1157 );
1158 let kv = build_attributes(&attrs, sql.as_deref(), Some(&ann));
1159 let keys: Vec<&str> = kv.iter().map(|k| k.key.as_str()).collect();
1160
1161 prop_assert!(keys.contains(&"db.system.name"));
1163
1164 prop_assert_eq!(keys.contains(&"server.address"), host.is_some());
1166 prop_assert_eq!(keys.contains(&"server.port"), port.is_some());
1167 prop_assert_eq!(keys.contains(&"db.namespace"), namespace.is_some());
1168 prop_assert_eq!(keys.contains(&"network.peer.address"), network_peer_address.is_some());
1169 prop_assert_eq!(keys.contains(&"network.peer.port"), network_peer_port.is_some());
1170
1171 prop_assert_eq!(keys.contains(&"db.operation.name"), ann.operation.is_some());
1173 prop_assert_eq!(keys.contains(&"db.collection.name"), ann.collection.is_some());
1174 prop_assert_eq!(keys.contains(&"db.query.summary"), ann.query_summary.is_some());
1175 prop_assert_eq!(keys.contains(&"db.stored_procedure.name"), ann.stored_procedure.is_some());
1176
1177 let expect_query_text = sql.is_some() && mode != QueryTextMode::Off;
1179 prop_assert_eq!(keys.contains(&"db.query.text"), expect_query_text);
1180 }
1181
1182 #[test]
1185 fn build_attributes_has_no_duplicate_keys(
1186 host in proptest::option::of("[a-z]{1,16}"),
1187 port in proptest::option::of(any::<u16>()),
1188 namespace in proptest::option::of("[a-z]{1,16}"),
1189 mode in any_query_text_mode(),
1190 sql in proptest::option::of(".{0,64}"),
1191 ann in any_annotations(),
1192 ) {
1193 let attrs = make_connection_attributes(host, port, namespace, None, None, mode);
1194 let kv = build_attributes(&attrs, sql.as_deref(), Some(&ann));
1195 let mut seen = std::collections::HashSet::new();
1196 for k in &kv {
1197 prop_assert!(
1198 seen.insert(k.key.as_str().to_owned()),
1199 "duplicate key in build_attributes output: {}",
1200 k.key.as_str(),
1201 );
1202 }
1203 }
1204
1205 #[test]
1209 fn build_attributes_no_panic_arbitrary_sql(
1210 sql in proptest::option::of(any::<String>()),
1211 mode in any_query_text_mode(),
1212 ann in any_annotations(),
1213 ) {
1214 let attrs = make_connection_attributes(None, None, None, None, None, mode);
1215 let _ = build_attributes(&attrs, sql.as_deref(), Some(&ann));
1216 }
1217
1218 #[test]
1222 fn build_attributes_no_annotations_emits_no_annotation_keys(
1223 mode in any_query_text_mode(),
1224 sql in proptest::option::of(".{0,64}"),
1225 ) {
1226 let attrs = make_connection_attributes(None, None, None, None, None, mode);
1227 let kv = build_attributes(&attrs, sql.as_deref(), None);
1228 let keys: Vec<&str> = kv.iter().map(|k| k.key.as_str()).collect();
1229 prop_assert!(!keys.contains(&"db.operation.name"));
1230 prop_assert!(!keys.contains(&"db.collection.name"));
1231 prop_assert!(!keys.contains(&"db.query.summary"));
1232 prop_assert!(!keys.contains(&"db.stored_procedure.name"));
1233 }
1234
1235 #[test]
1240 fn chain_compact_obfuscate_idempotent(s in chain_fragment_any()) {
1241 let f = |x: &str| crate::compact::compact_whitespace(&crate::obfuscate::obfuscate(x));
1242 let once = f(&s);
1243 let twice = f(&once);
1244 prop_assert_eq!(once, twice);
1245 }
1246
1247 #[test]
1252 fn chain_compact_obfuscate_no_leak(s in chain_fragment_marked()) {
1253 let f = |x: &str| crate::compact::compact_whitespace(&crate::obfuscate::obfuscate(x));
1254 let out = f(&s);
1255 prop_assert!(
1256 !out.contains("XSECRETX"),
1257 "sentinel leaked through chain: input={s:?} output={out:?}"
1258 );
1259 }
1260
1261 #[test]
1266 fn chain_emitted_query_text_trim_invariant(
1267 sql in any::<String>(),
1268 mode in prop_oneof![
1269 Just(QueryTextMode::Full),
1270 Just(QueryTextMode::Obfuscated),
1271 ],
1272 ) {
1273 let attrs = make_connection_attributes(None, None, None, None, None, mode);
1274 let kv = build_attributes(&attrs, Some(&sql), None);
1275 let value = kv
1276 .iter()
1277 .find(|k| k.key.as_str() == "db.query.text")
1278 .map(|k| k.value.clone());
1279 let value = value.expect("db.query.text must be emitted for Full/Obfuscated");
1284 let opentelemetry::Value::String(s) = value else {
1285 panic!("db.query.text must be a String value, got {value:?}");
1286 };
1287 let s = s.as_str();
1288 prop_assert!(!s.starts_with(' '), "leading space in db.query.text: {s:?}");
1289 prop_assert!(!s.ends_with(' '), "trailing space in db.query.text: {s:?}");
1290 }
1291
1292 #[test]
1299 fn append_annotation_attrs_membership_invariant(ann in any_annotations()) {
1300 let mut kv = Vec::new();
1301 append_annotation_attrs(&mut kv, Some(&ann));
1302 let keys: Vec<&str> = kv.iter().map(|k| k.key.as_str()).collect();
1303
1304 prop_assert_eq!(keys.contains(&"db.operation.name"), ann.operation.is_some());
1305 prop_assert_eq!(keys.contains(&"db.collection.name"), ann.collection.is_some());
1306 prop_assert_eq!(keys.contains(&"db.query.summary"), ann.query_summary.is_some());
1307 prop_assert_eq!(
1308 keys.contains(&"db.stored_procedure.name"),
1309 ann.stored_procedure.is_some(),
1310 );
1311
1312 prop_assert!(!keys.contains(&"db.system.name"));
1315 prop_assert!(!keys.contains(&"db.namespace"));
1316 prop_assert!(!keys.contains(&"db.query.text"));
1317
1318 let expected_count = usize::from(ann.operation.is_some())
1320 + usize::from(ann.collection.is_some())
1321 + usize::from(ann.query_summary.is_some())
1322 + usize::from(ann.stored_procedure.is_some());
1323 prop_assert_eq!(kv.len(), expected_count);
1324 }
1325 }
1326}