use std::marker::PhantomData;
use crate::core::error::Result;
use driver::pipeline::QueryResult;
use driver::{Connection, FromSql, Oid, Row, ToSql};
pub trait FromRow: Sized {
fn from_row(row: &Row) -> Result<Self>;
}
pub struct TypedQueryHandle<'sql> {
pub sql: &'sql str,
pub param_oids: Vec<Oid>,
pub macro_name: &'static str,
pub query_id: &'static str,
}
impl<'sql> TypedQueryHandle<'sql> {
pub fn new(
sql: &'sql str,
param_oids: Vec<Oid>,
macro_name: &'static str,
query_id: &'static str,
) -> Self {
Self {
sql,
param_oids,
macro_name,
query_id,
}
}
fn pair_with<'a>(
&self,
params: &'a [&'a (dyn ToSql + Sync)],
) -> Vec<(&'a (dyn ToSql + Sync), Oid)> {
debug_assert_eq!(
self.param_oids.len(),
params.len(),
"macro emitted mismatched parameter and OID counts",
);
params
.iter()
.zip(self.param_oids.iter())
.map(|(p, oid)| (*p, *oid))
.collect()
}
}
pub struct QueryExecution<'a, T> {
handle: TypedQueryHandle<'a>,
params: Vec<&'a (dyn ToSql + Sync)>,
_t: PhantomData<T>,
}
impl<'a, T: FromRow> QueryExecution<'a, T> {
pub fn new(handle: TypedQueryHandle<'a>, params: Vec<&'a (dyn ToSql + Sync)>) -> Self {
Self {
handle,
params,
_t: PhantomData,
}
}
pub async fn fetch_one(self, conn: &mut Connection) -> Result<T> {
crate::__priv::emit_query_macro(
conn,
self.handle.macro_name,
self.handle.query_id,
self.handle.sql,
);
let pairs = self.handle.pair_with(&self.params);
let row = conn.query_typed_one(self.handle.sql, &pairs).await?;
T::from_row(&row)
}
pub async fn fetch_optional(self, conn: &mut Connection) -> Result<Option<T>> {
crate::__priv::emit_query_macro(
conn,
self.handle.macro_name,
self.handle.query_id,
self.handle.sql,
);
let pairs = self.handle.pair_with(&self.params);
match conn.query_typed_opt(self.handle.sql, &pairs).await? {
Some(row) => Ok(Some(T::from_row(&row)?)),
None => Ok(None),
}
}
pub async fn fetch_all(self, conn: &mut Connection) -> Result<Vec<T>> {
crate::__priv::emit_query_macro(
conn,
self.handle.macro_name,
self.handle.query_id,
self.handle.sql,
);
let pairs = self.handle.pair_with(&self.params);
let rows = conn.query_typed(self.handle.sql, &pairs).await?;
rows.iter().map(T::from_row).collect()
}
pub async fn execute(self, conn: &mut Connection) -> Result<u64> {
crate::__priv::emit_query_macro(
conn,
self.handle.macro_name,
self.handle.query_id,
self.handle.sql,
);
let pairs = self.handle.pair_with(&self.params);
Ok(conn.execute_typed(self.handle.sql, &pairs).await?)
}
}
pub struct ScalarExecution<'a, T, S> {
inner: QueryExecution<'a, S>,
extract: fn(S) -> T,
}
impl<'a, T, S: FromRow> ScalarExecution<'a, T, S> {
pub fn new(inner: QueryExecution<'a, S>, extract: fn(S) -> T) -> Self {
Self { inner, extract }
}
pub async fn fetch_one(self, conn: &mut Connection) -> Result<T> {
let s = self.inner.fetch_one(conn).await?;
Ok((self.extract)(s))
}
pub async fn fetch_optional(self, conn: &mut Connection) -> Result<Option<T>> {
match self.inner.fetch_optional(conn).await? {
Some(s) => Ok(Some((self.extract)(s))),
None => Ok(None),
}
}
pub async fn fetch_all(self, conn: &mut Connection) -> Result<Vec<T>> {
let rows = self.inner.fetch_all(conn).await?;
Ok(rows.into_iter().map(self.extract).collect())
}
}
pub struct UncheckedExecution<'a, T> {
pub sql: &'a str,
pub params: Vec<&'a (dyn ToSql + Sync)>,
_t: PhantomData<T>,
}
impl<'a, T: FromRow> UncheckedExecution<'a, T> {
pub fn new(sql: &'a str, params: Vec<&'a (dyn ToSql + Sync)>) -> Self {
Self {
sql,
params,
_t: PhantomData,
}
}
pub async fn fetch_one(self, conn: &mut Connection) -> Result<T> {
crate::__priv::emit_query_macro(conn, "query_unchecked", "unchecked", self.sql);
let row = conn.query_one(self.sql, &self.params).await?;
T::from_row(&row)
}
pub async fn fetch_optional(self, conn: &mut Connection) -> Result<Option<T>> {
crate::__priv::emit_query_macro(conn, "query_unchecked", "unchecked", self.sql);
match conn.query_opt(self.sql, &self.params).await? {
Some(row) => Ok(Some(T::from_row(&row)?)),
None => Ok(None),
}
}
pub async fn fetch_all(self, conn: &mut Connection) -> Result<Vec<T>> {
crate::__priv::emit_query_macro(conn, "query_unchecked", "unchecked", self.sql);
let rows = conn.query(self.sql, &self.params).await?;
rows.iter().map(T::from_row).collect()
}
pub async fn execute(self, conn: &mut Connection) -> Result<u64> {
crate::__priv::emit_query_macro(conn, "query_unchecked", "unchecked", self.sql);
Ok(conn.execute(self.sql, &self.params).await?)
}
}
pub struct PipelineQuerySpec<'q> {
pub sql: &'q str,
pub param_oids: Vec<Oid>,
pub encoded_params: Vec<Option<Vec<u8>>>,
pub macro_name: &'static str,
pub query_id: &'static str,
}
pub struct PipelineExecution<'q> {
pub specs: Vec<PipelineQuerySpec<'q>>,
}
impl<'q> PipelineExecution<'q> {
pub fn new(specs: Vec<PipelineQuerySpec<'q>>) -> Self {
Self { specs }
}
pub async fn run(self, conn: &mut Connection) -> Result<Vec<QueryResult>> {
let mut batch = conn.pipeline();
for spec in self.specs {
crate::__priv::emit_query_macro(conn, spec.macro_name, spec.query_id, spec.sql);
let oids: Vec<u32> = spec.param_oids.iter().map(|o| u32::from(*o)).collect();
batch.add(spec.sql.to_string(), oids, spec.encoded_params);
}
Ok(conn.execute_pipeline(batch).await?)
}
}
pub fn encode_params(params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Option<Vec<u8>>>> {
use bytes::BytesMut;
let mut out = Vec::with_capacity(params.len());
for p in params {
if p.is_null() {
out.push(None);
} else {
let mut buf = BytesMut::new();
p.to_sql(&mut buf)?;
out.push(Some(buf.to_vec()));
}
}
Ok(out)
}
macro_rules! impl_from_row_tuple {
($($ty:ident at $idx:tt),+ $(,)?) => {
impl<$($ty),+> FromRow for ($($ty,)+)
where
$($ty: FromSql),+
{
fn from_row(row: &Row) -> Result<Self> {
Ok(($(
row.try_get::<$ty>($idx)
.map_err(crate::Error::Driver)?,
)+))
}
}
};
}
impl_from_row_tuple!(T0 at 0);
impl_from_row_tuple!(T0 at 0, T1 at 1);
impl_from_row_tuple!(T0 at 0, T1 at 1, T2 at 2);
impl_from_row_tuple!(T0 at 0, T1 at 1, T2 at 2, T3 at 3);
impl_from_row_tuple!(T0 at 0, T1 at 1, T2 at 2, T3 at 3, T4 at 4);
impl_from_row_tuple!(T0 at 0, T1 at 1, T2 at 2, T3 at 3, T4 at 4, T5 at 5);
impl_from_row_tuple!(T0 at 0, T1 at 1, T2 at 2, T3 at 3, T4 at 4, T5 at 5, T6 at 6);
impl_from_row_tuple!(T0 at 0, T1 at 1, T2 at 2, T3 at 3, T4 at 4, T5 at 5, T6 at 6, T7 at 7);
impl_from_row_tuple!(T0 at 0, T1 at 1, T2 at 2, T3 at 3, T4 at 4, T5 at 5, T6 at 6, T7 at 7, T8 at 8);
impl_from_row_tuple!(T0 at 0, T1 at 1, T2 at 2, T3 at 3, T4 at 4, T5 at 5, T6 at 6, T7 at 7, T8 at 8, T9 at 9);
impl_from_row_tuple!(T0 at 0, T1 at 1, T2 at 2, T3 at 3, T4 at 4, T5 at 5, T6 at 6, T7 at 7, T8 at 8, T9 at 9, T10 at 10);
impl_from_row_tuple!(T0 at 0, T1 at 1, T2 at 2, T3 at 3, T4 at 4, T5 at 5, T6 at 6, T7 at 7, T8 at 8, T9 at 9, T10 at 10, T11 at 11);
impl_from_row_tuple!(T0 at 0, T1 at 1, T2 at 2, T3 at 3, T4 at 4, T5 at 5, T6 at 6, T7 at 7, T8 at 8, T9 at 9, T10 at 10, T11 at 11, T12 at 12);
impl_from_row_tuple!(T0 at 0, T1 at 1, T2 at 2, T3 at 3, T4 at 4, T5 at 5, T6 at 6, T7 at 7, T8 at 8, T9 at 9, T10 at 10, T11 at 11, T12 at 12, T13 at 13);
impl_from_row_tuple!(T0 at 0, T1 at 1, T2 at 2, T3 at 3, T4 at 4, T5 at 5, T6 at 6, T7 at 7, T8 at 8, T9 at 9, T10 at 10, T11 at 11, T12 at 12, T13 at 13, T14 at 14);
impl_from_row_tuple!(T0 at 0, T1 at 1, T2 at 2, T3 at 3, T4 at 4, T5 at 5, T6 at 6, T7 at 7, T8 at 8, T9 at 9, T10 at 10, T11 at 11, T12 at 12, T13 at 13, T14 at 14, T15 at 15);