use std::marker::PhantomData;
use async_stream::try_stream;
use futures_core::Stream;
use futures_util::future::ready;
use futures_util::TryFutureExt;
use crate::arguments::Arguments;
use crate::cursor::{Cursor, HasCursor};
use crate::database::Database;
use crate::encode::Encode;
use crate::executor::{Execute, Executor, RefExecutor};
use crate::row::HasRow;
use crate::types::Type;
#[must_use = "query must be executed to affect database"]
pub struct Query<'q, DB>
where
DB: Database,
{
pub(crate) query: &'q str,
pub(crate) arguments: DB::Arguments,
database: PhantomData<DB>,
}
#[must_use = "query must be executed to affect database"]
pub struct Map<'q, DB, F>
where
DB: Database,
{
query: Query<'q, DB>,
mapper: F,
}
impl<'q, DB> Execute<'q, DB> for Query<'q, DB>
where
DB: Database,
{
fn into_parts(self) -> (&'q str, Option<DB::Arguments>) {
(self.query, Some(self.arguments))
}
#[doc(hidden)]
fn query_string(&self) -> &'q str {
self.query
}
}
impl<'q, DB> Query<'q, DB>
where
DB: Database,
{
pub fn bind<T>(mut self, value: T) -> Self
where
T: Type<DB>,
T: Encode<DB>,
{
self.arguments.add(value);
self
}
#[doc(hidden)]
pub fn bind_all(self, arguments: DB::Arguments) -> Query<'q, DB> {
Query {
query: self.query,
arguments,
database: PhantomData,
}
}
}
impl<'q, DB> Query<'q, DB>
where
DB: Database,
{
pub fn map<F, O>(self, mapper: F) -> Map<'q, DB, impl TryMapRow<DB, Output = O>>
where
O: Unpin,
F: MapRow<DB, Output = O>,
{
self.try_map(MapRowAdapter(mapper))
}
pub fn try_map<F>(self, mapper: F) -> Map<'q, DB, F>
where
F: TryMapRow<DB>,
{
Map {
query: self,
mapper,
}
}
}
impl<'q, DB> Query<'q, DB>
where
DB: Database,
Self: Execute<'q, DB>,
{
pub async fn execute<E>(self, mut executor: E) -> crate::Result<u64>
where
E: Executor<Database = DB>,
{
executor.execute(self).await
}
pub fn fetch<'e, E>(self, executor: E) -> <DB as HasCursor<'e, 'q>>::Cursor
where
E: RefExecutor<'e, Database = DB>,
{
executor.fetch_by_ref(self)
}
}
impl<'q, DB, F> Map<'q, DB, F>
where
DB: Database,
Query<'q, DB>: Execute<'q, DB>,
F: TryMapRow<DB>,
{
pub fn fetch<'e: 'q, E>(
mut self,
executor: E,
) -> impl Stream<Item = crate::Result<F::Output>> + Unpin + 'e
where
'q: 'e,
E: RefExecutor<'e, Database = DB> + 'e,
F: 'e,
F::Output: 'e,
{
Box::pin(try_stream! {
let mut cursor = executor.fetch_by_ref(self.query);
while let Some(next) = cursor.next().await? {
let mapped = self.mapper.try_map_row(next)?;
yield mapped;
}
})
}
pub async fn fetch_optional<'e, E>(self, executor: E) -> crate::Result<Option<F::Output>>
where
E: RefExecutor<'e, Database = DB>,
'q: 'e,
{
let mut cursor = executor.fetch_by_ref(self.query);
let mut mapper = self.mapper;
let val = cursor.next().await?;
val.map(|row| mapper.try_map_row(row)).transpose()
}
pub async fn fetch_one<'e, E>(self, executor: E) -> crate::Result<F::Output>
where
E: RefExecutor<'e, Database = DB>,
'q: 'e,
{
self.fetch_optional(executor)
.and_then(|row| match row {
Some(row) => ready(Ok(row)),
None => ready(Err(crate::Error::RowNotFound)),
})
.await
}
pub async fn fetch_all<'e, E>(mut self, executor: E) -> crate::Result<Vec<F::Output>>
where
E: RefExecutor<'e, Database = DB>,
'q: 'e,
{
let mut cursor = executor.fetch_by_ref(self.query);
let mut out = vec![];
while let Some(row) = cursor.next().await? {
out.push(self.mapper.try_map_row(row)?);
}
Ok(out)
}
}
pub trait TryMapRow<DB: Database> {
type Output: Unpin;
fn try_map_row(&mut self, row: <DB as HasRow>::Row) -> crate::Result<Self::Output>;
}
pub trait MapRow<DB: Database> {
type Output: Unpin;
fn map_row(&mut self, row: <DB as HasRow>::Row) -> Self::Output;
}
struct MapRowAdapter<F>(F);
impl<DB: Database, O, F> TryMapRow<DB> for MapRowAdapter<F>
where
O: Unpin,
F: MapRow<DB, Output = O>,
{
type Output = O;
fn try_map_row(&mut self, row: <DB as HasRow>::Row) -> crate::Result<Self::Output> {
Ok(self.0.map_row(row))
}
}
pub fn query<DB>(sql: &str) -> Query<DB>
where
DB: Database,
{
Query {
database: PhantomData,
arguments: Default::default(),
query: sql,
}
}