1use futures::stream::BoxStream;
85use sqlx::query::{Map, Query, QueryAs, QueryScalar};
86
87use crate::annotations::{Annotated, AnnotatedMut, QueryAnnotations};
88use crate::database::Database;
89
90mod sealed {
94 pub trait Sealed {}
96}
97
98pub trait QueryAnnotateExt: sealed::Sealed + Sized {
123 fn with_annotations(self, annotations: QueryAnnotations) -> AnnotatedQuery<Self>;
128
129 fn with_operation(
134 self,
135 operation: impl Into<String>,
136 collection: impl Into<String>,
137 ) -> AnnotatedQuery<Self> {
138 self.with_annotations(
139 QueryAnnotations::new()
140 .operation(operation)
141 .collection(collection),
142 )
143 }
144}
145
146impl<DB: sqlx::Database, A> sealed::Sealed for Query<'_, DB, A> {}
147impl<DB: sqlx::Database, A> QueryAnnotateExt for Query<'_, DB, A> {
148 fn with_annotations(self, annotations: QueryAnnotations) -> AnnotatedQuery<Self> {
149 AnnotatedQuery {
150 inner: self,
151 annotations,
152 }
153 }
154}
155
156impl<DB: sqlx::Database, O, A> sealed::Sealed for QueryAs<'_, DB, O, A> {}
157impl<DB: sqlx::Database, O, A> QueryAnnotateExt for QueryAs<'_, DB, O, A> {
158 fn with_annotations(self, annotations: QueryAnnotations) -> AnnotatedQuery<Self> {
159 AnnotatedQuery {
160 inner: self,
161 annotations,
162 }
163 }
164}
165
166impl<DB: sqlx::Database, O, A> sealed::Sealed for QueryScalar<'_, DB, O, A> {}
167impl<DB: sqlx::Database, O, A> QueryAnnotateExt for QueryScalar<'_, DB, O, A> {
168 fn with_annotations(self, annotations: QueryAnnotations) -> AnnotatedQuery<Self> {
169 AnnotatedQuery {
170 inner: self,
171 annotations,
172 }
173 }
174}
175
176impl<DB: sqlx::Database, F, A> sealed::Sealed for Map<'_, DB, F, A> {}
177impl<DB: sqlx::Database, F, A> QueryAnnotateExt for Map<'_, DB, F, A> {
178 fn with_annotations(self, annotations: QueryAnnotations) -> AnnotatedQuery<Self> {
179 AnnotatedQuery {
180 inner: self,
181 annotations,
182 }
183 }
184}
185
186#[must_use = "annotated queries do nothing until you call execute / fetch* on them"]
195pub struct AnnotatedQuery<Q> {
196 inner: Q,
197 annotations: QueryAnnotations,
198}
199
200impl<Q> std::fmt::Debug for AnnotatedQuery<Q> {
201 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202 f.debug_struct("AnnotatedQuery")
203 .field("annotations", &self.annotations)
204 .finish_non_exhaustive()
205 }
206}
207
208#[doc(hidden)]
227pub trait IntoAnnotatedExecutor<'e, DB: Database> {
228 type Wrapper: sqlx::Executor<'e, Database = DB>;
230
231 fn into_annotated(self, annotations: QueryAnnotations) -> Self::Wrapper;
233}
234
235impl<'e, DB> IntoAnnotatedExecutor<'e, DB> for &'e crate::Pool<DB>
236where
237 DB: Database,
238 for<'a> &'a mut DB::Connection: sqlx::Executor<'a, Database = DB>,
239{
240 type Wrapper = Annotated<'e, crate::Pool<DB>>;
241
242 fn into_annotated(self, annotations: QueryAnnotations) -> Self::Wrapper {
243 Annotated {
244 inner: self,
245 annotations,
246 state: self.state.clone(),
247 }
248 }
249}
250
251impl<'e, DB> IntoAnnotatedExecutor<'e, DB> for &'e mut crate::PoolConnection<DB>
252where
253 DB: Database,
254 for<'a> &'a mut DB::Connection: sqlx::Executor<'a, Database = DB>,
255{
256 type Wrapper = AnnotatedMut<'e, crate::PoolConnection<DB>>;
257
258 fn into_annotated(self, annotations: QueryAnnotations) -> Self::Wrapper {
259 AnnotatedMut {
260 state: self.state.clone(),
261 annotations,
262 inner: self,
263 }
264 }
265}
266
267impl<'e, 'tx, DB> IntoAnnotatedExecutor<'e, DB> for &'e mut crate::Transaction<'tx, DB>
268where
269 DB: Database,
270 for<'a> &'a mut DB::Connection: sqlx::Executor<'a, Database = DB>,
271{
272 type Wrapper = AnnotatedMut<'e, crate::Transaction<'tx, DB>>;
273
274 fn into_annotated(self, annotations: QueryAnnotations) -> Self::Wrapper {
275 AnnotatedMut {
276 state: self.state.clone(),
277 annotations,
278 inner: self,
279 }
280 }
281}
282
283macro_rules! impl_annotated_query_fetch_forwarders {
304 (row = $row:ty, extra_bounds = ($($extra_bounds:tt)*)) => {
305 pub fn with_annotations(mut self, annotations: QueryAnnotations) -> Self {
308 self.annotations = annotations;
309 self
310 }
311
312 pub fn with_operation(
315 self,
316 operation: impl Into<String>,
317 collection: impl Into<String>,
318 ) -> Self {
319 self.with_annotations(
320 QueryAnnotations::new()
321 .operation(operation)
322 .collection(collection),
323 )
324 }
325
326 pub fn fetch<'e, E>(self, executor: E) -> BoxStream<'e, Result<$row, sqlx::Error>>
328 where
329 'q: 'e,
330 A: 'e,
331 $($extra_bounds)*
332 E: 'e + IntoAnnotatedExecutor<'e, DB>,
333 {
334 let wrapper = executor.into_annotated(self.annotations);
335 self.inner.fetch(wrapper)
336 }
337
338 #[allow(deprecated, clippy::type_complexity)]
343 pub fn fetch_many<'e, E>(
344 self,
345 executor: E,
346 ) -> BoxStream<'e, Result<sqlx::Either<DB::QueryResult, $row>, sqlx::Error>>
347 where
348 'q: 'e,
349 A: 'e,
350 $($extra_bounds)*
351 E: 'e + IntoAnnotatedExecutor<'e, DB>,
352 {
353 let wrapper = executor.into_annotated(self.annotations);
354 self.inner.fetch_many(wrapper)
355 }
356
357 pub async fn fetch_all<'e, E>(self, executor: E) -> Result<Vec<$row>, sqlx::Error>
364 where
365 'q: 'e,
366 A: 'e,
367 $($extra_bounds)*
368 E: 'e + IntoAnnotatedExecutor<'e, DB>,
369 {
370 let wrapper = executor.into_annotated(self.annotations);
371 self.inner.fetch_all(wrapper).await
372 }
373
374 pub async fn fetch_one<'e, E>(self, executor: E) -> Result<$row, sqlx::Error>
381 where
382 'q: 'e,
383 A: 'e,
384 $($extra_bounds)*
385 E: 'e + IntoAnnotatedExecutor<'e, DB>,
386 {
387 let wrapper = executor.into_annotated(self.annotations);
388 self.inner.fetch_one(wrapper).await
389 }
390
391 pub async fn fetch_optional<'e, E>(
397 self,
398 executor: E,
399 ) -> Result<Option<$row>, sqlx::Error>
400 where
401 'q: 'e,
402 A: 'e,
403 $($extra_bounds)*
404 E: 'e + IntoAnnotatedExecutor<'e, DB>,
405 {
406 let wrapper = executor.into_annotated(self.annotations);
407 self.inner.fetch_optional(wrapper).await
408 }
409 };
410}
411
412macro_rules! impl_annotated_query_bind {
417 () => {
418 pub fn bind<T>(mut self, value: T) -> Self
421 where
422 T: 'q + sqlx::Encode<'q, DB> + sqlx::Type<DB>,
423 {
424 self.inner = self.inner.bind(value);
425 self
426 }
427 };
428}
429
430impl<'q, DB, A> AnnotatedQuery<Query<'q, DB, A>>
433where
434 DB: Database,
435 A: 'q + Send + sqlx::IntoArguments<'q, DB>,
436{
437 impl_annotated_query_fetch_forwarders!(row = DB::Row, extra_bounds = ());
438
439 pub async fn execute<'e, E>(self, executor: E) -> Result<DB::QueryResult, sqlx::Error>
446 where
447 'q: 'e,
448 A: 'e,
449 E: 'e + IntoAnnotatedExecutor<'e, DB>,
450 {
451 let wrapper = executor.into_annotated(self.annotations);
452 self.inner.execute(wrapper).await
453 }
454
455 #[allow(deprecated)]
461 pub async fn execute_many<'e, E>(
462 self,
463 executor: E,
464 ) -> BoxStream<'e, Result<DB::QueryResult, sqlx::Error>>
465 where
466 'q: 'e,
467 A: 'e,
468 E: 'e + IntoAnnotatedExecutor<'e, DB>,
469 {
470 let wrapper = executor.into_annotated(self.annotations);
471 self.inner.execute_many(wrapper).await
472 }
473
474 #[allow(clippy::type_complexity)]
478 pub fn map<F, O>(
479 self,
480 f: F,
481 ) -> AnnotatedQuery<Map<'q, DB, impl FnMut(DB::Row) -> Result<O, sqlx::Error> + Send, A>>
482 where
483 F: FnMut(DB::Row) -> O + Send,
484 O: Unpin,
485 {
486 AnnotatedQuery {
487 inner: self.inner.map(f),
488 annotations: self.annotations,
489 }
490 }
491
492 pub fn try_map<F, O>(self, f: F) -> AnnotatedQuery<Map<'q, DB, F, A>>
495 where
496 F: FnMut(DB::Row) -> Result<O, sqlx::Error> + Send,
497 O: Unpin,
498 {
499 AnnotatedQuery {
500 inner: self.inner.try_map(f),
501 annotations: self.annotations,
502 }
503 }
504}
505
506impl<'q, DB> AnnotatedQuery<Query<'q, DB, <DB as sqlx::Database>::Arguments<'q>>>
507where
508 DB: sqlx::Database,
509{
510 impl_annotated_query_bind!();
511}
512
513impl<'q, DB, O, A> AnnotatedQuery<QueryAs<'q, DB, O, A>>
516where
517 DB: Database,
518 A: 'q + Send + sqlx::IntoArguments<'q, DB>,
519 O: Send + Unpin + for<'r> sqlx::FromRow<'r, DB::Row>,
520{
521 impl_annotated_query_fetch_forwarders!(row = O, extra_bounds = (DB: 'e, O: 'e,));
522}
523
524impl<'q, DB, O> AnnotatedQuery<QueryAs<'q, DB, O, <DB as sqlx::Database>::Arguments<'q>>>
525where
526 DB: sqlx::Database,
527{
528 impl_annotated_query_bind!();
529}
530
531impl<'q, DB, O, A> AnnotatedQuery<QueryScalar<'q, DB, O, A>>
534where
535 DB: Database,
536 A: 'q + Send + sqlx::IntoArguments<'q, DB>,
537 O: Send + Unpin,
538 (O,): Send + Unpin + for<'r> sqlx::FromRow<'r, DB::Row>,
539{
540 impl_annotated_query_fetch_forwarders!(row = O, extra_bounds = (DB: 'e, O: 'e,));
541}
542
543impl<'q, DB, O> AnnotatedQuery<QueryScalar<'q, DB, O, <DB as sqlx::Database>::Arguments<'q>>>
544where
545 DB: sqlx::Database,
546{
547 impl_annotated_query_bind!();
548}
549
550impl<'q, DB, F, A, O> AnnotatedQuery<Map<'q, DB, F, A>>
553where
554 DB: Database,
555 F: FnMut(DB::Row) -> Result<O, sqlx::Error> + Send,
556 O: Send + Unpin,
557 A: 'q + Send + sqlx::IntoArguments<'q, DB>,
558{
559 impl_annotated_query_fetch_forwarders!(row = O, extra_bounds = (DB: 'e, F: 'e, O: 'e,));
560
561 #[allow(clippy::type_complexity)]
565 pub fn map<G, P>(
566 self,
567 g: G,
568 ) -> AnnotatedQuery<Map<'q, DB, impl FnMut(DB::Row) -> Result<P, sqlx::Error> + Send, A>>
569 where
570 G: FnMut(O) -> P + Send,
571 P: Unpin,
572 {
573 AnnotatedQuery {
574 inner: self.inner.map(g),
575 annotations: self.annotations,
576 }
577 }
578
579 #[allow(clippy::type_complexity)]
582 pub fn try_map<G, P>(
583 self,
584 g: G,
585 ) -> AnnotatedQuery<Map<'q, DB, impl FnMut(DB::Row) -> Result<P, sqlx::Error> + Send, A>>
586 where
587 G: FnMut(O) -> Result<P, sqlx::Error> + Send,
588 P: Unpin,
589 {
590 AnnotatedQuery {
591 inner: self.inner.try_map(g),
592 annotations: self.annotations,
593 }
594 }
595}
596
597#[cfg(all(test, feature = "sqlite"))]
598mod tests {
599 use sqlx::Execute as _;
600 use sqlx::Sqlite;
601
602 use super::*;
603
604 #[test]
607 fn with_annotations_replaces_previous() {
608 let q = sqlx::query::<Sqlite>("SELECT 1")
609 .with_annotations(QueryAnnotations::new().operation("FIRST"))
610 .with_annotations(QueryAnnotations::new().operation("SECOND"));
611 assert_eq!(q.annotations.operation.as_deref(), Some("SECOND"));
612 }
613
614 #[test]
615 fn with_operation_sets_both_fields() {
616 let q = sqlx::query::<Sqlite>("SELECT 1").with_operation("SELECT", "users");
617 assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
618 assert_eq!(q.annotations.collection.as_deref(), Some("users"));
619 assert!(q.annotations.query_summary.is_none());
620 assert!(q.annotations.stored_procedure.is_none());
621 }
622
623 #[test]
624 fn with_operation_replaces_previous_annotations() {
625 let q = sqlx::query::<Sqlite>("SELECT 1")
627 .with_annotations(QueryAnnotations::new().query_summary("legacy summary"))
628 .with_operation("SELECT", "users");
629 assert!(
630 q.annotations.query_summary.is_none(),
631 "with_operation must replace, not merge"
632 );
633 }
634
635 #[test]
636 fn debug_impl_includes_annotations() {
637 let q = sqlx::query::<Sqlite>("SELECT 1")
638 .with_annotations(QueryAnnotations::new().operation("DEBUG_OP"));
639 let debug = format!("{q:?}");
640 assert!(debug.contains("AnnotatedQuery"));
641 assert!(debug.contains("DEBUG_OP"));
642 }
643
644 #[test]
645 fn bind_then_with_annotations_compose() {
646 let q = sqlx::query::<Sqlite>("SELECT ?1, ?2")
647 .bind(1_i32)
648 .with_annotations(QueryAnnotations::new().operation("SELECT"));
649 assert_eq!(q.inner.sql(), "SELECT ?1, ?2");
650 assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
651 }
652
653 #[test]
654 fn with_annotations_first_then_bind_compose() {
655 let q = sqlx::query::<Sqlite>("SELECT ?1, ?2")
656 .with_annotations(QueryAnnotations::new().operation("SELECT"))
657 .bind(1_i32);
658 assert_eq!(q.inner.sql(), "SELECT ?1, ?2");
659 assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
660 }
661
662 #[test]
663 fn query_as_supports_with_annotations() {
664 let q = sqlx::query_as::<Sqlite, (i32,)>("SELECT 1").with_operation("SELECT", "users");
665 assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
666 assert_eq!(q.annotations.collection.as_deref(), Some("users"));
667 }
668
669 #[test]
670 fn query_as_wrapper_with_annotations_replaces() {
671 let q = sqlx::query_as::<Sqlite, (i32,)>("SELECT 1")
673 .with_annotations(QueryAnnotations::new().operation("FIRST"))
674 .with_annotations(QueryAnnotations::new().operation("SECOND"));
675 assert_eq!(q.annotations.operation.as_deref(), Some("SECOND"));
676 }
677
678 #[test]
679 fn query_as_wrapper_with_operation_chains_on_wrapper() {
680 let q = sqlx::query_as::<Sqlite, (i32,)>("SELECT 1")
683 .with_annotations(QueryAnnotations::new().query_summary("legacy"))
684 .with_operation("SELECT", "users");
685 assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
686 assert_eq!(q.annotations.collection.as_deref(), Some("users"));
687 assert!(q.annotations.query_summary.is_none());
688 }
689
690 #[test]
691 fn query_as_wrapper_bind_after_annotations() {
692 let q = sqlx::query_as::<Sqlite, (i32,)>("SELECT ?1")
693 .with_annotations(QueryAnnotations::new().operation("SELECT"))
694 .bind(7_i32);
695 assert_eq!(q.inner.sql(), "SELECT ?1");
696 assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
697 }
698
699 #[test]
700 fn query_scalar_supports_with_annotations() {
701 let q = sqlx::query_scalar::<Sqlite, i32>("SELECT 1").with_operation("SELECT", "users");
702 assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
703 assert_eq!(q.annotations.collection.as_deref(), Some("users"));
704 }
705
706 #[test]
707 fn query_scalar_wrapper_with_annotations_replaces() {
708 let q = sqlx::query_scalar::<Sqlite, i32>("SELECT 1")
709 .with_annotations(QueryAnnotations::new().operation("FIRST"))
710 .with_annotations(QueryAnnotations::new().operation("SECOND"));
711 assert_eq!(q.annotations.operation.as_deref(), Some("SECOND"));
712 }
713
714 #[test]
715 fn query_scalar_wrapper_with_operation_chains_on_wrapper() {
716 let q = sqlx::query_scalar::<Sqlite, i32>("SELECT 1")
717 .with_annotations(QueryAnnotations::new().query_summary("legacy"))
718 .with_operation("SELECT", "users");
719 assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
720 assert_eq!(q.annotations.collection.as_deref(), Some("users"));
721 assert!(q.annotations.query_summary.is_none());
722 }
723
724 #[test]
725 fn query_scalar_wrapper_bind_after_annotations() {
726 let q = sqlx::query_scalar::<Sqlite, i32>("SELECT ?1")
727 .with_annotations(QueryAnnotations::new().operation("SELECT"))
728 .bind(7_i32);
729 assert_eq!(q.inner.sql(), "SELECT ?1");
730 assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
731 }
732
733 #[test]
736 fn query_with_annotations_map_preserves_annotations() {
737 let q = sqlx::query::<Sqlite>("SELECT 1")
739 .with_annotations(QueryAnnotations::new().operation("SELECT"))
740 .map(|_row: sqlx::sqlite::SqliteRow| 42_i64);
741 assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
742 }
743
744 #[test]
745 fn query_bind_with_annotations_map_preserves_annotations() {
746 let q = sqlx::query::<Sqlite>("SELECT ?1")
748 .bind(1_i32)
749 .with_annotations(QueryAnnotations::new().operation("SELECT"))
750 .map(|_row: sqlx::sqlite::SqliteRow| 42_i64);
751 assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
752 }
753
754 #[test]
755 fn query_map_with_annotations_replaces_previous() {
756 let q = sqlx::query::<Sqlite>("SELECT 1")
758 .map(|_row: sqlx::sqlite::SqliteRow| 42_i64)
759 .with_annotations(QueryAnnotations::new().operation("FIRST"))
760 .with_annotations(QueryAnnotations::new().operation("SECOND"));
761 assert_eq!(q.annotations.operation.as_deref(), Some("SECOND"));
762 }
763
764 #[test]
765 fn query_try_map_with_annotations_compose() {
766 let q = sqlx::query::<Sqlite>("SELECT 1")
768 .try_map(|_row: sqlx::sqlite::SqliteRow| Ok::<_, sqlx::Error>(42_i64))
769 .with_operation("SELECT", "users");
770 assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
771 assert_eq!(q.annotations.collection.as_deref(), Some("users"));
772 }
773
774 #[test]
775 fn annotated_query_try_map_preserves_annotations() {
776 let q = sqlx::query::<Sqlite>("SELECT 1")
779 .with_annotations(QueryAnnotations::new().operation("SELECT"))
780 .try_map(|_row: sqlx::sqlite::SqliteRow| Ok::<_, sqlx::Error>(42_i64));
781 assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
782 }
783
784 #[test]
785 fn map_compose_after_annotations() {
786 let q = sqlx::query::<Sqlite>("SELECT 1")
788 .with_annotations(QueryAnnotations::new().operation("SELECT"))
789 .map(|_row: sqlx::sqlite::SqliteRow| 1_i64)
790 .map(|n| n + 1);
791 assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
792 }
793
794 #[test]
795 fn map_then_with_operation_replaces_via_wrapper() {
796 let q = sqlx::query::<Sqlite>("SELECT 1")
798 .map(|_row: sqlx::sqlite::SqliteRow| 1_i64)
799 .with_annotations(QueryAnnotations::new().query_summary("legacy"))
800 .with_operation("SELECT", "users");
801 assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
802 assert_eq!(q.annotations.collection.as_deref(), Some("users"));
803 assert!(q.annotations.query_summary.is_none());
804 }
805
806 #[test]
807 fn debug_impl_for_annotated_map_includes_annotations() {
808 let q = sqlx::query::<Sqlite>("SELECT 1")
810 .map(|_row: sqlx::sqlite::SqliteRow| 1_i64)
811 .with_annotations(QueryAnnotations::new().operation("DEBUG_MAP"));
812 let debug = format!("{q:?}");
813 assert!(debug.contains("AnnotatedQuery"));
814 assert!(debug.contains("DEBUG_MAP"));
815 }
816}