use futures::stream::BoxStream;
use sqlx::query::{Map, Query, QueryAs, QueryScalar};
use crate::annotations::{Annotated, AnnotatedMut, QueryAnnotations};
use crate::database::Database;
mod sealed {
pub trait Sealed {}
}
pub trait QueryAnnotateExt: sealed::Sealed + Sized {
fn with_annotations(self, annotations: QueryAnnotations) -> AnnotatedQuery<Self>;
fn with_operation(
self,
operation: impl Into<String>,
collection: impl Into<String>,
) -> AnnotatedQuery<Self> {
self.with_annotations(
QueryAnnotations::new()
.operation(operation)
.collection(collection),
)
}
}
impl<DB: sqlx::Database, A> sealed::Sealed for Query<'_, DB, A> {}
impl<DB: sqlx::Database, A> QueryAnnotateExt for Query<'_, DB, A> {
fn with_annotations(self, annotations: QueryAnnotations) -> AnnotatedQuery<Self> {
AnnotatedQuery {
inner: self,
annotations,
}
}
}
impl<DB: sqlx::Database, O, A> sealed::Sealed for QueryAs<'_, DB, O, A> {}
impl<DB: sqlx::Database, O, A> QueryAnnotateExt for QueryAs<'_, DB, O, A> {
fn with_annotations(self, annotations: QueryAnnotations) -> AnnotatedQuery<Self> {
AnnotatedQuery {
inner: self,
annotations,
}
}
}
impl<DB: sqlx::Database, O, A> sealed::Sealed for QueryScalar<'_, DB, O, A> {}
impl<DB: sqlx::Database, O, A> QueryAnnotateExt for QueryScalar<'_, DB, O, A> {
fn with_annotations(self, annotations: QueryAnnotations) -> AnnotatedQuery<Self> {
AnnotatedQuery {
inner: self,
annotations,
}
}
}
impl<DB: sqlx::Database, F, A> sealed::Sealed for Map<'_, DB, F, A> {}
impl<DB: sqlx::Database, F, A> QueryAnnotateExt for Map<'_, DB, F, A> {
fn with_annotations(self, annotations: QueryAnnotations) -> AnnotatedQuery<Self> {
AnnotatedQuery {
inner: self,
annotations,
}
}
}
#[must_use = "annotated queries do nothing until you call execute / fetch* on them"]
pub struct AnnotatedQuery<Q> {
inner: Q,
annotations: QueryAnnotations,
}
impl<Q> std::fmt::Debug for AnnotatedQuery<Q> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AnnotatedQuery")
.field("annotations", &self.annotations)
.finish_non_exhaustive()
}
}
#[doc(hidden)]
pub trait IntoAnnotatedExecutor<'e, DB: Database> {
type Wrapper: sqlx::Executor<'e, Database = DB>;
fn into_annotated(self, annotations: QueryAnnotations) -> Self::Wrapper;
}
impl<'e, DB> IntoAnnotatedExecutor<'e, DB> for &'e crate::Pool<DB>
where
DB: Database,
for<'a> &'a mut DB::Connection: sqlx::Executor<'a, Database = DB>,
{
type Wrapper = Annotated<'e, crate::Pool<DB>>;
fn into_annotated(self, annotations: QueryAnnotations) -> Self::Wrapper {
Annotated {
inner: self,
annotations,
state: self.state.clone(),
}
}
}
impl<'e, DB> IntoAnnotatedExecutor<'e, DB> for &'e mut crate::PoolConnection<DB>
where
DB: Database,
for<'a> &'a mut DB::Connection: sqlx::Executor<'a, Database = DB>,
{
type Wrapper = AnnotatedMut<'e, crate::PoolConnection<DB>>;
fn into_annotated(self, annotations: QueryAnnotations) -> Self::Wrapper {
AnnotatedMut {
state: self.state.clone(),
annotations,
inner: self,
}
}
}
impl<'e, 'tx, DB> IntoAnnotatedExecutor<'e, DB> for &'e mut crate::Transaction<'tx, DB>
where
DB: Database,
for<'a> &'a mut DB::Connection: sqlx::Executor<'a, Database = DB>,
{
type Wrapper = AnnotatedMut<'e, crate::Transaction<'tx, DB>>;
fn into_annotated(self, annotations: QueryAnnotations) -> Self::Wrapper {
AnnotatedMut {
state: self.state.clone(),
annotations,
inner: self,
}
}
}
macro_rules! impl_annotated_query_fetch_forwarders {
(row = $row:ty, extra_bounds = ($($extra_bounds:tt)*)) => {
pub fn with_annotations(mut self, annotations: QueryAnnotations) -> Self {
self.annotations = annotations;
self
}
pub fn with_operation(
self,
operation: impl Into<String>,
collection: impl Into<String>,
) -> Self {
self.with_annotations(
QueryAnnotations::new()
.operation(operation)
.collection(collection),
)
}
pub fn fetch<'e, E>(self, executor: E) -> BoxStream<'e, Result<$row, sqlx::Error>>
where
'q: 'e,
A: 'e,
$($extra_bounds)*
E: 'e + IntoAnnotatedExecutor<'e, DB>,
{
let wrapper = executor.into_annotated(self.annotations);
self.inner.fetch(wrapper)
}
#[allow(deprecated, clippy::type_complexity)]
pub fn fetch_many<'e, E>(
self,
executor: E,
) -> BoxStream<'e, Result<sqlx::Either<DB::QueryResult, $row>, sqlx::Error>>
where
'q: 'e,
A: 'e,
$($extra_bounds)*
E: 'e + IntoAnnotatedExecutor<'e, DB>,
{
let wrapper = executor.into_annotated(self.annotations);
self.inner.fetch_many(wrapper)
}
pub async fn fetch_all<'e, E>(self, executor: E) -> Result<Vec<$row>, sqlx::Error>
where
'q: 'e,
A: 'e,
$($extra_bounds)*
E: 'e + IntoAnnotatedExecutor<'e, DB>,
{
let wrapper = executor.into_annotated(self.annotations);
self.inner.fetch_all(wrapper).await
}
pub async fn fetch_one<'e, E>(self, executor: E) -> Result<$row, sqlx::Error>
where
'q: 'e,
A: 'e,
$($extra_bounds)*
E: 'e + IntoAnnotatedExecutor<'e, DB>,
{
let wrapper = executor.into_annotated(self.annotations);
self.inner.fetch_one(wrapper).await
}
pub async fn fetch_optional<'e, E>(
self,
executor: E,
) -> Result<Option<$row>, sqlx::Error>
where
'q: 'e,
A: 'e,
$($extra_bounds)*
E: 'e + IntoAnnotatedExecutor<'e, DB>,
{
let wrapper = executor.into_annotated(self.annotations);
self.inner.fetch_optional(wrapper).await
}
};
}
macro_rules! impl_annotated_query_bind {
() => {
pub fn bind<T>(mut self, value: T) -> Self
where
T: 'q + sqlx::Encode<'q, DB> + sqlx::Type<DB>,
{
self.inner = self.inner.bind(value);
self
}
};
}
impl<'q, DB, A> AnnotatedQuery<Query<'q, DB, A>>
where
DB: Database,
A: 'q + Send + sqlx::IntoArguments<'q, DB>,
{
impl_annotated_query_fetch_forwarders!(row = DB::Row, extra_bounds = ());
pub async fn execute<'e, E>(self, executor: E) -> Result<DB::QueryResult, sqlx::Error>
where
'q: 'e,
A: 'e,
E: 'e + IntoAnnotatedExecutor<'e, DB>,
{
let wrapper = executor.into_annotated(self.annotations);
self.inner.execute(wrapper).await
}
#[allow(deprecated)]
pub async fn execute_many<'e, E>(
self,
executor: E,
) -> BoxStream<'e, Result<DB::QueryResult, sqlx::Error>>
where
'q: 'e,
A: 'e,
E: 'e + IntoAnnotatedExecutor<'e, DB>,
{
let wrapper = executor.into_annotated(self.annotations);
self.inner.execute_many(wrapper).await
}
#[allow(clippy::type_complexity)]
pub fn map<F, O>(
self,
f: F,
) -> AnnotatedQuery<Map<'q, DB, impl FnMut(DB::Row) -> Result<O, sqlx::Error> + Send, A>>
where
F: FnMut(DB::Row) -> O + Send,
O: Unpin,
{
AnnotatedQuery {
inner: self.inner.map(f),
annotations: self.annotations,
}
}
pub fn try_map<F, O>(self, f: F) -> AnnotatedQuery<Map<'q, DB, F, A>>
where
F: FnMut(DB::Row) -> Result<O, sqlx::Error> + Send,
O: Unpin,
{
AnnotatedQuery {
inner: self.inner.try_map(f),
annotations: self.annotations,
}
}
}
impl<'q, DB> AnnotatedQuery<Query<'q, DB, <DB as sqlx::Database>::Arguments<'q>>>
where
DB: sqlx::Database,
{
impl_annotated_query_bind!();
}
impl<'q, DB, O, A> AnnotatedQuery<QueryAs<'q, DB, O, A>>
where
DB: Database,
A: 'q + Send + sqlx::IntoArguments<'q, DB>,
O: Send + Unpin + for<'r> sqlx::FromRow<'r, DB::Row>,
{
impl_annotated_query_fetch_forwarders!(row = O, extra_bounds = (DB: 'e, O: 'e,));
}
impl<'q, DB, O> AnnotatedQuery<QueryAs<'q, DB, O, <DB as sqlx::Database>::Arguments<'q>>>
where
DB: sqlx::Database,
{
impl_annotated_query_bind!();
}
impl<'q, DB, O, A> AnnotatedQuery<QueryScalar<'q, DB, O, A>>
where
DB: Database,
A: 'q + Send + sqlx::IntoArguments<'q, DB>,
O: Send + Unpin,
(O,): Send + Unpin + for<'r> sqlx::FromRow<'r, DB::Row>,
{
impl_annotated_query_fetch_forwarders!(row = O, extra_bounds = (DB: 'e, O: 'e,));
}
impl<'q, DB, O> AnnotatedQuery<QueryScalar<'q, DB, O, <DB as sqlx::Database>::Arguments<'q>>>
where
DB: sqlx::Database,
{
impl_annotated_query_bind!();
}
impl<'q, DB, F, A, O> AnnotatedQuery<Map<'q, DB, F, A>>
where
DB: Database,
F: FnMut(DB::Row) -> Result<O, sqlx::Error> + Send,
O: Send + Unpin,
A: 'q + Send + sqlx::IntoArguments<'q, DB>,
{
impl_annotated_query_fetch_forwarders!(row = O, extra_bounds = (DB: 'e, F: 'e, O: 'e,));
#[allow(clippy::type_complexity)]
pub fn map<G, P>(
self,
g: G,
) -> AnnotatedQuery<Map<'q, DB, impl FnMut(DB::Row) -> Result<P, sqlx::Error> + Send, A>>
where
G: FnMut(O) -> P + Send,
P: Unpin,
{
AnnotatedQuery {
inner: self.inner.map(g),
annotations: self.annotations,
}
}
#[allow(clippy::type_complexity)]
pub fn try_map<G, P>(
self,
g: G,
) -> AnnotatedQuery<Map<'q, DB, impl FnMut(DB::Row) -> Result<P, sqlx::Error> + Send, A>>
where
G: FnMut(O) -> Result<P, sqlx::Error> + Send,
P: Unpin,
{
AnnotatedQuery {
inner: self.inner.try_map(g),
annotations: self.annotations,
}
}
}
#[cfg(all(test, feature = "sqlite"))]
mod tests {
use sqlx::Execute as _;
use sqlx::Sqlite;
use super::*;
#[test]
fn with_annotations_replaces_previous() {
let q = sqlx::query::<Sqlite>("SELECT 1")
.with_annotations(QueryAnnotations::new().operation("FIRST"))
.with_annotations(QueryAnnotations::new().operation("SECOND"));
assert_eq!(q.annotations.operation.as_deref(), Some("SECOND"));
}
#[test]
fn with_operation_sets_both_fields() {
let q = sqlx::query::<Sqlite>("SELECT 1").with_operation("SELECT", "users");
assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
assert_eq!(q.annotations.collection.as_deref(), Some("users"));
assert!(q.annotations.query_summary.is_none());
assert!(q.annotations.stored_procedure.is_none());
}
#[test]
fn with_operation_replaces_previous_annotations() {
let q = sqlx::query::<Sqlite>("SELECT 1")
.with_annotations(QueryAnnotations::new().query_summary("legacy summary"))
.with_operation("SELECT", "users");
assert!(
q.annotations.query_summary.is_none(),
"with_operation must replace, not merge"
);
}
#[test]
fn debug_impl_includes_annotations() {
let q = sqlx::query::<Sqlite>("SELECT 1")
.with_annotations(QueryAnnotations::new().operation("DEBUG_OP"));
let debug = format!("{q:?}");
assert!(debug.contains("AnnotatedQuery"));
assert!(debug.contains("DEBUG_OP"));
}
#[test]
fn bind_then_with_annotations_compose() {
let q = sqlx::query::<Sqlite>("SELECT ?1, ?2")
.bind(1_i32)
.with_annotations(QueryAnnotations::new().operation("SELECT"));
assert_eq!(q.inner.sql(), "SELECT ?1, ?2");
assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
}
#[test]
fn with_annotations_first_then_bind_compose() {
let q = sqlx::query::<Sqlite>("SELECT ?1, ?2")
.with_annotations(QueryAnnotations::new().operation("SELECT"))
.bind(1_i32);
assert_eq!(q.inner.sql(), "SELECT ?1, ?2");
assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
}
#[test]
fn query_as_supports_with_annotations() {
let q = sqlx::query_as::<Sqlite, (i32,)>("SELECT 1").with_operation("SELECT", "users");
assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
assert_eq!(q.annotations.collection.as_deref(), Some("users"));
}
#[test]
fn query_as_wrapper_with_annotations_replaces() {
let q = sqlx::query_as::<Sqlite, (i32,)>("SELECT 1")
.with_annotations(QueryAnnotations::new().operation("FIRST"))
.with_annotations(QueryAnnotations::new().operation("SECOND"));
assert_eq!(q.annotations.operation.as_deref(), Some("SECOND"));
}
#[test]
fn query_as_wrapper_with_operation_chains_on_wrapper() {
let q = sqlx::query_as::<Sqlite, (i32,)>("SELECT 1")
.with_annotations(QueryAnnotations::new().query_summary("legacy"))
.with_operation("SELECT", "users");
assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
assert_eq!(q.annotations.collection.as_deref(), Some("users"));
assert!(q.annotations.query_summary.is_none());
}
#[test]
fn query_as_wrapper_bind_after_annotations() {
let q = sqlx::query_as::<Sqlite, (i32,)>("SELECT ?1")
.with_annotations(QueryAnnotations::new().operation("SELECT"))
.bind(7_i32);
assert_eq!(q.inner.sql(), "SELECT ?1");
assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
}
#[test]
fn query_scalar_supports_with_annotations() {
let q = sqlx::query_scalar::<Sqlite, i32>("SELECT 1").with_operation("SELECT", "users");
assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
assert_eq!(q.annotations.collection.as_deref(), Some("users"));
}
#[test]
fn query_scalar_wrapper_with_annotations_replaces() {
let q = sqlx::query_scalar::<Sqlite, i32>("SELECT 1")
.with_annotations(QueryAnnotations::new().operation("FIRST"))
.with_annotations(QueryAnnotations::new().operation("SECOND"));
assert_eq!(q.annotations.operation.as_deref(), Some("SECOND"));
}
#[test]
fn query_scalar_wrapper_with_operation_chains_on_wrapper() {
let q = sqlx::query_scalar::<Sqlite, i32>("SELECT 1")
.with_annotations(QueryAnnotations::new().query_summary("legacy"))
.with_operation("SELECT", "users");
assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
assert_eq!(q.annotations.collection.as_deref(), Some("users"));
assert!(q.annotations.query_summary.is_none());
}
#[test]
fn query_scalar_wrapper_bind_after_annotations() {
let q = sqlx::query_scalar::<Sqlite, i32>("SELECT ?1")
.with_annotations(QueryAnnotations::new().operation("SELECT"))
.bind(7_i32);
assert_eq!(q.inner.sql(), "SELECT ?1");
assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
}
#[test]
fn query_with_annotations_map_preserves_annotations() {
let q = sqlx::query::<Sqlite>("SELECT 1")
.with_annotations(QueryAnnotations::new().operation("SELECT"))
.map(|_row: sqlx::sqlite::SqliteRow| 42_i64);
assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
}
#[test]
fn query_bind_with_annotations_map_preserves_annotations() {
let q = sqlx::query::<Sqlite>("SELECT ?1")
.bind(1_i32)
.with_annotations(QueryAnnotations::new().operation("SELECT"))
.map(|_row: sqlx::sqlite::SqliteRow| 42_i64);
assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
}
#[test]
fn query_map_with_annotations_replaces_previous() {
let q = sqlx::query::<Sqlite>("SELECT 1")
.map(|_row: sqlx::sqlite::SqliteRow| 42_i64)
.with_annotations(QueryAnnotations::new().operation("FIRST"))
.with_annotations(QueryAnnotations::new().operation("SECOND"));
assert_eq!(q.annotations.operation.as_deref(), Some("SECOND"));
}
#[test]
fn query_try_map_with_annotations_compose() {
let q = sqlx::query::<Sqlite>("SELECT 1")
.try_map(|_row: sqlx::sqlite::SqliteRow| Ok::<_, sqlx::Error>(42_i64))
.with_operation("SELECT", "users");
assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
assert_eq!(q.annotations.collection.as_deref(), Some("users"));
}
#[test]
fn annotated_query_try_map_preserves_annotations() {
let q = sqlx::query::<Sqlite>("SELECT 1")
.with_annotations(QueryAnnotations::new().operation("SELECT"))
.try_map(|_row: sqlx::sqlite::SqliteRow| Ok::<_, sqlx::Error>(42_i64));
assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
}
#[test]
fn map_compose_after_annotations() {
let q = sqlx::query::<Sqlite>("SELECT 1")
.with_annotations(QueryAnnotations::new().operation("SELECT"))
.map(|_row: sqlx::sqlite::SqliteRow| 1_i64)
.map(|n| n + 1);
assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
}
#[test]
fn map_then_with_operation_replaces_via_wrapper() {
let q = sqlx::query::<Sqlite>("SELECT 1")
.map(|_row: sqlx::sqlite::SqliteRow| 1_i64)
.with_annotations(QueryAnnotations::new().query_summary("legacy"))
.with_operation("SELECT", "users");
assert_eq!(q.annotations.operation.as_deref(), Some("SELECT"));
assert_eq!(q.annotations.collection.as_deref(), Some("users"));
assert!(q.annotations.query_summary.is_none());
}
#[test]
fn debug_impl_for_annotated_map_includes_annotations() {
let q = sqlx::query::<Sqlite>("SELECT 1")
.map(|_row: sqlx::sqlite::SqliteRow| 1_i64)
.with_annotations(QueryAnnotations::new().operation("DEBUG_MAP"));
let debug = format!("{q:?}");
assert!(debug.contains("AnnotatedQuery"));
assert!(debug.contains("DEBUG_MAP"));
}
}