#[cfg(feature = "postgres")]
use super::bind_query_as;
#[cfg(feature = "mysql")]
use super::bind_query_as_my;
#[cfg(feature = "sqlite")]
use super::bind_query_as_sqlite;
#[cfg(feature = "postgres")]
use sqlx::postgres::{PgArguments, PgRow};
#[cfg(feature = "postgres")]
use sqlx::query::Query;
use super::ExecError;
use crate::core::{AggregateQuery, SelectQuery, SqlValue};
use crate::sql::Pool;
use crate::sql::{
MaybeMyFromRow, MaybeMyLoadRelated, MaybePgFromRow, MaybeSqliteFromRow, MaybeSqliteLoadRelated,
UpdaterPool as _,
};
#[cfg(feature = "postgres")]
use super::bind_query;
#[cfg(feature = "mysql")]
use super::bind_query_my;
#[cfg(feature = "sqlite")]
use super::bind_query_sqlite;
#[cfg(feature = "postgres")]
fn pg_cell_to_sqlvalue(row: &PgRow, i: usize) -> SqlValue {
use sqlx::Row as _;
if let Ok(v) = row.try_get::<i64, _>(i) {
SqlValue::I64(v)
} else if let Ok(v) = row.try_get::<i32, _>(i) {
SqlValue::I32(v)
} else if let Ok(v) = row.try_get::<f64, _>(i) {
SqlValue::F64(v)
} else if let Ok(v) = row.try_get::<bool, _>(i) {
SqlValue::Bool(v)
} else if let Ok(v) = row.try_get::<rust_decimal::Decimal, _>(i) {
SqlValue::Decimal(v)
} else if let Ok(v) = row.try_get::<String, _>(i) {
SqlValue::String(v)
} else if let Ok(v) = row.try_get::<serde_json::Value, _>(i) {
SqlValue::Json(v)
} else {
SqlValue::Null
}
}
#[cfg(feature = "mysql")]
fn my_cell_to_sqlvalue(row: &sqlx::mysql::MySqlRow, i: usize) -> SqlValue {
use sqlx::{Column as _, Row as _, TypeInfo as _};
if row.column(i).type_info().name().contains("UNSIGNED") {
if let Ok(v) = row.try_get::<u64, _>(i) {
return i64::try_from(v)
.map_or_else(|_| SqlValue::String(v.to_string()), SqlValue::I64);
}
if let Ok(v) = row.try_get::<u32, _>(i) {
return SqlValue::I64(i64::from(v));
}
}
if let Ok(v) = row.try_get::<i64, _>(i) {
SqlValue::I64(v)
} else if let Ok(v) = row.try_get::<i32, _>(i) {
SqlValue::I32(v)
} else if let Ok(v) = row.try_get::<f64, _>(i) {
SqlValue::F64(v)
} else if let Ok(v) = row.try_get::<bool, _>(i) {
SqlValue::Bool(v)
} else if let Ok(v) = row.try_get::<rust_decimal::Decimal, _>(i) {
SqlValue::Decimal(v)
} else if let Ok(v) = row.try_get::<String, _>(i) {
SqlValue::String(v)
} else {
SqlValue::Null
}
}
#[cfg(feature = "sqlite")]
fn sqlite_cell_to_sqlvalue(row: &sqlx::sqlite::SqliteRow, i: usize) -> SqlValue {
use sqlx::{Row as _, TypeInfo as _, ValueRef as _};
if let Ok(raw) = row.try_get_raw(i) {
let is_null = raw.is_null();
let is_text = raw.type_info().name() == "TEXT";
let as_text = row.try_get_unchecked::<String, _>(i);
if is_text {
return if is_null {
SqlValue::Null
} else {
as_text.map_or(SqlValue::Null, SqlValue::String)
};
}
}
if let Ok(v) = row.try_get::<i64, _>(i) {
SqlValue::I64(v)
} else if let Ok(v) = row.try_get::<i32, _>(i) {
SqlValue::I32(v)
} else if let Ok(v) = row.try_get::<f64, _>(i) {
SqlValue::F64(v)
} else if let Ok(v) = row.try_get::<bool, _>(i) {
SqlValue::Bool(v)
} else if let Ok(v) = row.try_get::<String, _>(i) {
SqlValue::String(v)
} else {
SqlValue::Null
}
}
pub async fn fetch_values_dict(
pool: &Pool,
query: &SelectQuery,
) -> Result<Vec<std::collections::HashMap<String, SqlValue>>, ExecError> {
let stmt = pool.dialect().compile_select(query)?;
match pool {
#[cfg(feature = "postgres")]
Pool::Postgres(pg) => {
let mut q: Query<'_, sqlx::Postgres, PgArguments> = sqlx::query(&stmt.sql);
for v in stmt.params {
q = bind_query(q, v);
}
let rows = q.fetch_all(pg).await?;
let mut out = Vec::with_capacity(rows.len());
for row in &rows {
use sqlx::Column as _;
use sqlx::Row as _;
let mut map = std::collections::HashMap::new();
for (i, col) in row.columns().iter().enumerate() {
map.insert(col.name().to_owned(), pg_cell_to_sqlvalue(row, i));
}
out.push(map);
}
Ok(out)
}
#[cfg(feature = "mysql")]
Pool::Mysql(my) => {
let mut q: sqlx::query::Query<'_, sqlx::MySql, sqlx::mysql::MySqlArguments> =
sqlx::query(&stmt.sql);
for v in stmt.params {
q = bind_query_my(q, v);
}
let rows = q.fetch_all(my).await?;
let mut out = Vec::with_capacity(rows.len());
for row in &rows {
use sqlx::Column as _;
use sqlx::Row as _;
let mut map = std::collections::HashMap::new();
for (i, col) in row.columns().iter().enumerate() {
map.insert(col.name().to_owned(), my_cell_to_sqlvalue(row, i));
}
out.push(map);
}
Ok(out)
}
#[cfg(feature = "sqlite")]
Pool::Sqlite(sq) => {
let mut q: sqlx::query::Query<'_, sqlx::Sqlite, sqlx::sqlite::SqliteArguments<'_>> =
sqlx::query(&stmt.sql);
for v in stmt.params {
q = bind_query_sqlite(q, v);
}
let rows = q.fetch_all(sq).await?;
let mut out = Vec::with_capacity(rows.len());
for row in &rows {
use sqlx::Column as _;
use sqlx::Row as _;
let mut map = std::collections::HashMap::new();
for (i, col) in row.columns().iter().enumerate() {
map.insert(col.name().to_owned(), sqlite_cell_to_sqlvalue(row, i));
}
out.push(map);
}
Ok(out)
}
}
}
pub async fn fetch_aggregate_dict(
pool: &Pool,
query: &AggregateQuery,
) -> Result<Vec<std::collections::HashMap<String, SqlValue>>, ExecError> {
let stmt = pool.dialect().compile_aggregate(query)?;
match pool {
#[cfg(feature = "postgres")]
Pool::Postgres(pg) => {
let mut q: Query<'_, sqlx::Postgres, PgArguments> = sqlx::query(&stmt.sql);
for v in stmt.params {
q = bind_query(q, v);
}
let rows = q.fetch_all(pg).await?;
let mut out = Vec::with_capacity(rows.len());
for row in &rows {
use sqlx::Column as _;
use sqlx::Row as _;
let mut map = std::collections::HashMap::new();
for (i, col) in row.columns().iter().enumerate() {
map.insert(col.name().to_owned(), pg_cell_to_sqlvalue(row, i));
}
out.push(map);
}
Ok(out)
}
#[cfg(feature = "mysql")]
Pool::Mysql(my) => {
let mut q: sqlx::query::Query<'_, sqlx::MySql, sqlx::mysql::MySqlArguments> =
sqlx::query(&stmt.sql);
for v in stmt.params {
q = bind_query_my(q, v);
}
let rows = q.fetch_all(my).await?;
let mut out = Vec::with_capacity(rows.len());
for row in &rows {
use sqlx::Column as _;
use sqlx::Row as _;
let mut map = std::collections::HashMap::new();
for (i, col) in row.columns().iter().enumerate() {
map.insert(col.name().to_owned(), my_cell_to_sqlvalue(row, i));
}
out.push(map);
}
Ok(out)
}
#[cfg(feature = "sqlite")]
Pool::Sqlite(sq) => {
let mut q: sqlx::query::Query<'_, sqlx::Sqlite, sqlx::sqlite::SqliteArguments<'_>> =
sqlx::query(&stmt.sql);
for v in stmt.params {
q = bind_query_sqlite(q, v);
}
let rows = q.fetch_all(sq).await?;
let mut out = Vec::with_capacity(rows.len());
for row in &rows {
use sqlx::Column as _;
use sqlx::Row as _;
let mut map = std::collections::HashMap::new();
for (i, col) in row.columns().iter().enumerate() {
map.insert(col.name().to_owned(), sqlite_cell_to_sqlvalue(row, i));
}
out.push(map);
}
Ok(out)
}
}
}
pub async fn fetch_values_list(
pool: &Pool,
query: &SelectQuery,
) -> Result<Vec<Vec<SqlValue>>, ExecError> {
let stmt = pool.dialect().compile_select(query)?;
match pool {
#[cfg(feature = "postgres")]
Pool::Postgres(pg) => {
let mut q: Query<'_, sqlx::Postgres, PgArguments> = sqlx::query(&stmt.sql);
for v in stmt.params {
q = bind_query(q, v);
}
let rows = q.fetch_all(pg).await?;
let mut out = Vec::with_capacity(rows.len());
for row in &rows {
use sqlx::Row as _;
let n = row.columns().len();
let mut v = Vec::with_capacity(n);
for i in 0..n {
v.push(pg_cell_to_sqlvalue(row, i));
}
out.push(v);
}
Ok(out)
}
#[cfg(feature = "mysql")]
Pool::Mysql(my) => {
let mut q: sqlx::query::Query<'_, sqlx::MySql, sqlx::mysql::MySqlArguments> =
sqlx::query(&stmt.sql);
for v in stmt.params {
q = bind_query_my(q, v);
}
let rows = q.fetch_all(my).await?;
let mut out = Vec::with_capacity(rows.len());
for row in &rows {
use sqlx::Row as _;
let n = row.columns().len();
let mut v = Vec::with_capacity(n);
for i in 0..n {
v.push(my_cell_to_sqlvalue(row, i));
}
out.push(v);
}
Ok(out)
}
#[cfg(feature = "sqlite")]
Pool::Sqlite(sq) => {
let mut q: sqlx::query::Query<'_, sqlx::Sqlite, sqlx::sqlite::SqliteArguments<'_>> =
sqlx::query(&stmt.sql);
for v in stmt.params {
q = bind_query_sqlite(q, v);
}
let rows = q.fetch_all(sq).await?;
let mut out = Vec::with_capacity(rows.len());
for row in &rows {
use sqlx::Row as _;
let n = row.columns().len();
let mut v = Vec::with_capacity(n);
for i in 0..n {
v.push(sqlite_cell_to_sqlvalue(row, i));
}
out.push(v);
}
Ok(out)
}
}
}
#[cfg(feature = "postgres")]
pub trait MaybePgScalar:
for<'r> sqlx::Decode<'r, sqlx::Postgres> + sqlx::Type<sqlx::Postgres>
{
}
#[cfg(feature = "postgres")]
impl<T> MaybePgScalar for T where
T: for<'r> sqlx::Decode<'r, sqlx::Postgres> + sqlx::Type<sqlx::Postgres>
{
}
#[cfg(not(feature = "postgres"))]
pub trait MaybePgScalar {}
#[cfg(not(feature = "postgres"))]
impl<T> MaybePgScalar for T {}
#[cfg(feature = "mysql")]
pub trait MaybeMyScalar: for<'r> sqlx::Decode<'r, sqlx::MySql> + sqlx::Type<sqlx::MySql> {}
#[cfg(feature = "mysql")]
impl<T> MaybeMyScalar for T where T: for<'r> sqlx::Decode<'r, sqlx::MySql> + sqlx::Type<sqlx::MySql> {}
#[cfg(not(feature = "mysql"))]
pub trait MaybeMyScalar {}
#[cfg(not(feature = "mysql"))]
impl<T> MaybeMyScalar for T {}
#[cfg(feature = "sqlite")]
pub trait MaybeSqliteScalar:
for<'r> sqlx::Decode<'r, sqlx::Sqlite> + sqlx::Type<sqlx::Sqlite>
{
}
#[cfg(feature = "sqlite")]
impl<T> MaybeSqliteScalar for T where
T: for<'r> sqlx::Decode<'r, sqlx::Sqlite> + sqlx::Type<sqlx::Sqlite>
{
}
#[cfg(not(feature = "sqlite"))]
pub trait MaybeSqliteScalar {}
#[cfg(not(feature = "sqlite"))]
impl<T> MaybeSqliteScalar for T {}
pub async fn fetch_values_flat<U>(pool: &Pool, query: &SelectQuery) -> Result<Vec<U>, ExecError>
where
U: MaybePgScalar + MaybeMyScalar + MaybeSqliteScalar + Send + Unpin,
{
let stmt = pool.dialect().compile_select(query)?;
match pool {
#[cfg(feature = "postgres")]
Pool::Postgres(pg) => {
let mut q: sqlx::query::QueryScalar<'_, sqlx::Postgres, U, PgArguments> =
sqlx::query_scalar(&stmt.sql);
for v in stmt.params {
q = bind_query_scalar_pg(q, v);
}
Ok(q.fetch_all(pg).await?)
}
#[cfg(feature = "mysql")]
Pool::Mysql(my) => {
let mut q: sqlx::query::QueryScalar<'_, sqlx::MySql, U, sqlx::mysql::MySqlArguments> =
sqlx::query_scalar(&stmt.sql);
for v in stmt.params {
q = bind_query_scalar_my(q, v);
}
Ok(q.fetch_all(my).await?)
}
#[cfg(feature = "sqlite")]
Pool::Sqlite(sq) => {
let mut q: sqlx::query::QueryScalar<
'_,
sqlx::Sqlite,
U,
sqlx::sqlite::SqliteArguments<'_>,
> = sqlx::query_scalar(&stmt.sql);
for v in stmt.params {
q = bind_query_scalar_sqlite(q, v);
}
Ok(q.fetch_all(sq).await?)
}
}
}
pub async fn fetch_values_pairs<K, V>(
pool: &Pool,
query: &SelectQuery,
) -> Result<Vec<(K, V)>, ExecError>
where
K: Send + Unpin,
V: Send + Unpin,
(K, V): MaybePgFromRow + MaybeMyFromRow + MaybeSqliteFromRow + Send + Unpin,
{
let stmt = pool.dialect().compile_select(query)?;
match pool {
#[cfg(feature = "postgres")]
Pool::Postgres(pg) => {
let mut q: sqlx::query::QueryAs<'_, sqlx::Postgres, (K, V), PgArguments> =
sqlx::query_as::<_, (K, V)>(&stmt.sql);
for v in stmt.params {
q = bind_query_as(q, v);
}
Ok(q.fetch_all(pg).await?)
}
#[cfg(feature = "mysql")]
Pool::Mysql(my) => {
let mut q: sqlx::query::QueryAs<'_, sqlx::MySql, (K, V), sqlx::mysql::MySqlArguments> =
sqlx::query_as::<_, (K, V)>(&stmt.sql);
for v in stmt.params {
q = bind_query_as_my(q, v);
}
Ok(q.fetch_all(my).await?)
}
#[cfg(feature = "sqlite")]
Pool::Sqlite(sq) => {
let mut q: sqlx::query::QueryAs<
'_,
sqlx::Sqlite,
(K, V),
sqlx::sqlite::SqliteArguments<'_>,
> = sqlx::query_as::<_, (K, V)>(&stmt.sql);
for v in stmt.params {
q = bind_query_as_sqlite(q, v);
}
Ok(q.fetch_all(sq).await?)
}
}
}
#[cfg(feature = "postgres")]
fn bind_query_scalar_pg<U>(
q: sqlx::query::QueryScalar<'_, sqlx::Postgres, U, PgArguments>,
value: SqlValue,
) -> sqlx::query::QueryScalar<'_, sqlx::Postgres, U, PgArguments> {
bind_match!(q, value)
}
#[cfg(feature = "mysql")]
fn bind_query_scalar_my<U>(
q: sqlx::query::QueryScalar<'_, sqlx::MySql, U, sqlx::mysql::MySqlArguments>,
value: SqlValue,
) -> sqlx::query::QueryScalar<'_, sqlx::MySql, U, sqlx::mysql::MySqlArguments> {
bind_match_mysql!(q, value)
}
#[cfg(feature = "sqlite")]
fn bind_query_scalar_sqlite<'a, U>(
q: sqlx::query::QueryScalar<'a, sqlx::Sqlite, U, sqlx::sqlite::SqliteArguments<'a>>,
value: SqlValue,
) -> sqlx::query::QueryScalar<'a, sqlx::Sqlite, U, sqlx::sqlite::SqliteArguments<'a>> {
bind_match_sqlite!(q, value)
}
impl<T: crate::core::Model> crate::query::ValuesQuerySet<T> {
pub async fn fetch(
self,
pool: &Pool,
) -> Result<Vec<std::collections::HashMap<String, SqlValue>>, ExecError> {
let q = self.compile()?;
fetch_values_dict(pool, &q).await
}
}
impl<T: crate::core::Model> crate::query::AggregateBuilder<T> {
pub async fn fetch(
self,
pool: &Pool,
) -> Result<Vec<std::collections::HashMap<String, SqlValue>>, ExecError> {
let q = self.compile()?;
fetch_aggregate_dict(pool, &q).await
}
}
impl<T: crate::core::Model> crate::query::ValuesListQuerySet<T> {
pub async fn fetch(self, pool: &Pool) -> Result<Vec<Vec<SqlValue>>, ExecError> {
let q = self.compile()?;
fetch_values_list(pool, &q).await
}
}
impl<T: crate::core::Model> crate::query::QuerySet<T> {
pub async fn pluck<U>(self, col: &'static str, pool: &Pool) -> Result<Vec<U>, ExecError>
where
U: MaybePgScalar + MaybeMyScalar + MaybeSqliteScalar + Send + Unpin,
{
self.values_list_flat(col).fetch::<U>(pool).await
}
pub async fn pks<K>(self, pool: &Pool) -> Result<Vec<K>, ExecError>
where
K: MaybePgScalar + MaybeMyScalar + MaybeSqliteScalar + Send + Unpin,
{
let pk_col = T::SCHEMA.primary_key().map(|f| f.column).ok_or_else(|| {
ExecError::Query(crate::core::QueryError::UnknownField {
model: T::SCHEMA.name,
field: "<pk>".to_string(),
})
})?;
self.pluck::<K>(pk_col, pool).await
}
pub async fn pluck_pairs<K, V>(
self,
key_col: &'static str,
value_col: &'static str,
pool: &Pool,
) -> Result<Vec<(K, V)>, ExecError>
where
K: Send + Unpin,
V: Send + Unpin,
(K, V): MaybePgFromRow + MaybeMyFromRow + MaybeSqliteFromRow + Send + Unpin,
{
let q = self.values_list(&[key_col, value_col]).compile()?;
fetch_values_pairs::<K, V>(pool, &q).await
}
pub async fn value<U>(self, col: &'static str, pool: &Pool) -> Result<Option<U>, ExecError>
where
U: MaybePgScalar + MaybeMyScalar + MaybeSqliteScalar + Send + Unpin,
{
self.values_list_flat(col).first::<U>(pool).await
}
pub fn to_sql(self, pool: &Pool) -> Result<String, ExecError> {
let q = self.compile()?;
let stmt = pool.dialect().compile_select(&q)?;
Ok(stmt.sql)
}
pub async fn sum<U>(self, col: &str, pool: &Pool) -> Result<Option<U>, ExecError>
where
(Option<U>,): crate::sql::MaybePgFromRow
+ crate::sql::MaybeMyFromRow
+ crate::sql::MaybeSqliteFromRow
+ Send
+ Unpin,
{
self.queryset_aggregate_one::<U>(col, crate::core::AggregateExpr::Sum, pool)
.await
}
pub async fn paginate(
self,
page: i64,
per_page: i64,
pool: &Pool,
) -> Result<(Vec<T>, i64), ExecError>
where
T: crate::core::Model
+ crate::sql::MaybePgFromRow
+ crate::sql::MaybeMyFromRow
+ crate::sql::MaybeSqliteFromRow
+ crate::sql::LoadRelated
+ crate::sql::MaybeMyLoadRelated
+ crate::sql::MaybeSqliteLoadRelated
+ Send
+ Unpin,
{
use crate::sql::{CounterPool as _, FetcherPool as _};
let total = <crate::query::QuerySet<T> as ::core::clone::Clone>::clone(&self)
.count(pool)
.await?;
let offset = if page > 1 { (page - 1) * per_page } else { 0 };
let rows = self.limit(per_page).offset(offset).fetch(pool).await?;
Ok((rows, total))
}
pub async fn avg<U>(self, col: &str, pool: &Pool) -> Result<Option<U>, ExecError>
where
(Option<U>,): crate::sql::MaybePgFromRow
+ crate::sql::MaybeMyFromRow
+ crate::sql::MaybeSqliteFromRow
+ Send
+ Unpin,
{
self.queryset_aggregate_one::<U>(col, crate::core::AggregateExpr::Avg, pool)
.await
}
pub async fn min<U>(self, col: &str, pool: &Pool) -> Result<Option<U>, ExecError>
where
(Option<U>,): crate::sql::MaybePgFromRow
+ crate::sql::MaybeMyFromRow
+ crate::sql::MaybeSqliteFromRow
+ Send
+ Unpin,
{
self.queryset_aggregate_one::<U>(col, crate::core::AggregateExpr::Min, pool)
.await
}
pub async fn max<U>(self, col: &str, pool: &Pool) -> Result<Option<U>, ExecError>
where
(Option<U>,): crate::sql::MaybePgFromRow
+ crate::sql::MaybeMyFromRow
+ crate::sql::MaybeSqliteFromRow
+ Send
+ Unpin,
{
self.queryset_aggregate_one::<U>(col, crate::core::AggregateExpr::Max, pool)
.await
}
async fn queryset_aggregate_one<U>(
self,
col: &str,
build: fn(&'static str) -> crate::core::AggregateExpr,
pool: &Pool,
) -> Result<Option<U>, ExecError>
where
(Option<U>,): crate::sql::MaybePgFromRow
+ crate::sql::MaybeMyFromRow
+ crate::sql::MaybeSqliteFromRow
+ Send
+ Unpin,
{
let col_static = crate::sql::model_shortcuts::resolve_col::<T>(col)?;
let select_q = self.compile()?;
let aggregate_q = crate::core::AggregateQuery {
model: <T as crate::core::Model>::SCHEMA,
joins: select_q.joins,
where_clause: select_q.where_clause,
group_by: Vec::new(),
aggregates: vec![("v".into(), build(col_static))],
aliases: Vec::new(),
having: None,
order_by: Vec::new(),
limit: None,
offset: None,
};
let rows: Vec<(Option<U>,)> = crate::sql::fetch_aggregate_pool(pool, &aggregate_q).await?;
Ok(rows.into_iter().next().and_then(|t| t.0))
}
pub fn to_compiled(self, pool: &Pool) -> Result<crate::sql::CompiledStatement, ExecError> {
let q = self.compile()?;
let stmt = pool.dialect().compile_select(&q)?;
Ok(stmt)
}
}
impl<T> crate::query::QuerySet<T>
where
T: crate::core::Model
+ Send
+ Unpin
+ MaybePgFromRow
+ MaybeMyFromRow
+ MaybeSqliteFromRow
+ crate::sql::LoadRelated
+ MaybeMyLoadRelated
+ MaybeSqliteLoadRelated,
{
pub async fn increment(self, col: &str, by: i64, pool: &Pool) -> Result<u64, ExecError> {
let col_static = crate::sql::model_shortcuts::resolve_col::<T>(col)?;
self.update()
.set_expr(
col,
crate::sql::model_shortcuts::add_signed_expr(col_static, by),
)
.execute_pool(pool)
.await
}
pub async fn decrement(self, col: &str, by: i64, pool: &Pool) -> Result<u64, ExecError> {
self.increment(col, -by, pool).await
}
}
impl<T: crate::core::Model> crate::query::ValuesFlatQuerySet<T> {
pub async fn fetch<U>(self, pool: &Pool) -> Result<Vec<U>, ExecError>
where
U: MaybePgScalar + MaybeMyScalar + MaybeSqliteScalar + Send + Unpin,
{
let q = self.compile()?;
fetch_values_flat::<U>(pool, &q).await
}
pub async fn first<U>(self, pool: &Pool) -> Result<Option<U>, ExecError>
where
U: MaybePgScalar + MaybeMyScalar + MaybeSqliteScalar + Send + Unpin,
{
let col = self.col;
let qs = self.qs.limit(1);
let q = crate::query::ValuesFlatQuerySet { qs, col }.compile()?;
let rows = fetch_values_flat::<U>(pool, &q).await?;
Ok(rows.into_iter().next())
}
}