use std::borrow::Cow;
use std::marker::PhantomData;
pub use either::Either;
use crate::arguments::{Arguments, IntoArguments};
use crate::database::{Database, HasArguments, HasStatement, HasStatementCache};
use crate::encode::Encode;
use crate::error::Error;
use crate::executor::{Execute, Executor};
use crate::io::chan_stream::ChanStream;
use crate::statement::Statement;
use crate::types::Type;
use crate::chan_stream;
use crate::io::chan_stream::TryStream;
#[must_use = "query must be executed to affect database"]
pub struct Query<DB: Database, A> {
pub statement: Either<String, <DB as HasStatement>::Statement>,
pub arguments: Option<A>,
pub database: PhantomData<DB>,
pub persistent: bool,
}
#[must_use = "query must be executed to affect database"]
pub struct Map<DB: Database, F, A> {
inner: Query<DB, A>,
mapper: F,
}
impl<'q, DB, A> Execute<'q, DB> for Query<DB, A>
where
DB: Database,
A: Send + IntoArguments<'q, DB>,
{
#[inline]
fn sql(&self) -> &str {
match self.statement {
Either::Right(ref statement) => statement.sql(),
Either::Left(ref sql) => sql,
}
}
fn statement(&self) -> Option<&<DB as HasStatement>::Statement> {
match self.statement {
Either::Right(ref statement) => Some(&statement),
Either::Left(_) => None,
}
}
#[inline]
fn take_arguments(&mut self) -> Option<<DB as HasArguments<'q>>::Arguments> {
self.arguments.take().map(IntoArguments::into_arguments)
}
#[inline]
fn persistent(&self) -> bool {
self.persistent
}
}
impl<'q, DB: Database> Query<DB, <DB as HasArguments<'q>>::Arguments> {
pub fn bind<T: 'q + Send + Encode<'q, DB> + Type<DB>>(mut self, value: T) -> Self {
if let Some(arguments) = &mut self.arguments {
arguments.add(value);
}
self
}
pub fn reset_sql(mut self, sql: String) -> Self {
match &mut self.statement {
Either::Left(l) => {
*l = sql;
}
Either::Right(r) => {
*r.sql_mut() = sql;
}
}
self
}
pub fn push_sql(mut self, sql: &str) -> Self {
match &mut self.statement {
Either::Left(l) => {
l.push_str(sql);
}
Either::Right(r) => {
r.sql_mut().push_str(sql);
}
}
self
}
}
impl<'q, DB, A> Query<DB, A>
where
DB: Database + HasStatementCache,
{
pub fn persistent(mut self, value: bool) -> Self {
self.persistent = value;
self
}
}
impl<'q, DB, A: Send> Query<DB, A>
where
DB: Database,
A: 'q + IntoArguments<'q, DB>,
{
#[inline]
pub fn map<F, O>(
self,
mut f: F,
) -> Map<DB, impl FnMut(DB::Row) -> Result<O, Error> + Send, A>
where
F: FnMut(DB::Row) -> O + Send,
{
self.try_map(move |row| Ok(f(row)))
}
#[inline]
pub fn try_map<F, O>(self, f: F) -> Map<DB, F, A>
where
F: FnMut(DB::Row) -> Result<O, Error> + Send,
{
Map {
inner: self,
mapper: f,
}
}
#[inline]
pub fn execute<'c, E>(self, mut executor: E) -> Result<DB::QueryResult, Error>
where E: Executor<Database=DB>,
{
executor.execute(self)
}
#[inline]
pub fn execute_many<'c, E>(
self,
mut executor: E,
) -> ChanStream<DB::QueryResult>
where E: Executor<Database=DB>,
{
executor.execute_many(self)
}
#[inline]
pub fn fetch<'c, E>(self, mut executor: E) -> ChanStream<DB::Row>
where E: Executor<Database=DB>,
{
executor.fetch(self)
}
#[inline]
pub fn fetch_many<'c, E>(
self,
mut executor: E,
) -> ChanStream<Either<DB::QueryResult, DB::Row>>
where E: Executor<Database=DB>,
{
executor.fetch_many(self)
}
#[inline]
pub fn fetch_all<'c, E>(self, mut executor: E) -> Result<Vec<DB::Row>, Error>
where E: Executor<Database=DB>,
{
executor.fetch_all(self)
}
#[inline]
pub fn fetch_one<'c, E>(self, mut executor: E) -> Result<DB::Row, Error>
where E: Executor<Database=DB>,
{
executor.fetch_one(self)
}
#[inline]
pub fn fetch_optional<'c, E>(self, mut executor: E) -> Result<Option<DB::Row>, Error>
where E: Executor<Database=DB>,
{
executor.fetch_optional(self)
}
}
impl<'q, DB, F: Send, A: Send> Execute<'q, DB> for Map<DB, F, A>
where
DB: Database,
A: IntoArguments<'q, DB>,
{
#[inline]
fn sql(&self) -> &str {
self.inner.sql()
}
#[inline]
fn statement(&self) -> Option<&<DB as HasStatement>::Statement> {
self.inner.statement()
}
#[inline]
fn take_arguments(&mut self) -> Option<<DB as HasArguments<'q>>::Arguments> {
self.inner.take_arguments()
}
#[inline]
fn persistent(&self) -> bool {
self.inner.arguments.is_some()
}
}
impl<'q, DB, F, O, A> Map<DB, F, A>
where
DB: Database,
F: FnMut(DB::Row) -> Result<O, Error> + Send,
O: Send,
A: 'q + Send + IntoArguments<'q, DB>,
{
#[inline]
pub fn map<G, P>(
self,
mut g: G,
) -> Map<DB, impl FnMut(DB::Row) -> Result<P, Error> + Send, A>
where
G: FnMut(O) -> P + Send,
{
self.try_map(move |data| Ok(g(data)))
}
#[inline]
pub fn try_map<G, P>(
self,
mut g: G,
) -> Map<DB, impl FnMut(DB::Row) -> Result<P, Error> + Send, A>
where
G: FnMut(O) -> Result<P, Error> + Send
{
let mut f = self.mapper;
Map {
inner: self.inner,
mapper: move |row| f(row).and_then(|o| g(o)),
}
}
pub fn fetch<'c, E>(self, mut executor: E) -> ChanStream<O>
where E: 'c + Executor<Database=DB>,
DB: 'c,
F: 'c,
O: 'c,
{
self.fetch_many(executor)
.map(|step| {
match step {
Either::Left(_) => None,
Either::Right(o) => Some(o),
}
})
}
pub fn fetch_many<'c, E>(
mut self,
mut executor: E,
) -> ChanStream<Either<DB::QueryResult, O>>
where E: 'c + Executor<Database=DB>,
DB: 'c,
F: 'c,
O: 'c,
{
chan_stream! {
let mut s = executor.fetch_many(self.inner);
while let Some(v) = s.try_next()? {
r#yield!(match v {
Either::Left(v) => Either::Left(v),
Either::Right(row) => {
Either::Right((self.mapper)(row)?)
}
});
}
Ok(())
}
}
pub fn fetch_all<'c, E>(self, mut executor: E) -> Result<Vec<O>, Error>
where E: 'c + Executor<Database=DB>,
DB: 'c,
F: 'c,
O: 'c,
{
self.fetch(executor).collect(|it| {
Some(Ok(it))
})
}
pub fn fetch_one<'c, E>(self, mut executor: E) -> Result<O, Error>
where E: 'c + Executor<Database=DB>,
DB: 'c,
F: 'c,
O: 'c,
{
self.fetch_optional(executor)
.and_then(|row| match row {
Some(row) => Ok(row),
None => Err(Error::RowNotFound),
})
}
pub fn fetch_optional<'c, E>(mut self, mut executor: E) -> Result<Option<O>, Error>
where E: 'c + Executor<Database=DB>,
DB: 'c,
F: 'c,
O: 'c,
{
let row = executor.fetch_optional(self.inner)?;
if let Some(row) = row {
(self.mapper)(row).map(Some)
} else {
Ok(None)
}
}
}
pub fn query_statement<'q, DB>(
statement: <DB as HasStatement>::Statement,
) -> Query<DB, <DB as HasArguments<'q>>::Arguments>
where
DB: Database,
{
Query {
database: PhantomData,
arguments: Some(Default::default()),
statement: Either::Right(statement),
persistent: true,
}
}
pub fn query_statement_with<'q, DB, A>(
statement: <DB as HasStatement>::Statement,
arguments: A,
) -> Query<DB, A>
where
DB: Database,
A: IntoArguments<'q, DB>,
{
Query {
database: PhantomData,
arguments: Some(arguments),
statement: Either::Right(statement),
persistent: true,
}
}
pub fn query<DB>(sql: &str) -> Query<DB, <DB as HasArguments<'_>>::Arguments>
where
DB: Database,
{
Query {
database: PhantomData,
arguments: Some(Default::default()),
statement: Either::Left(sql.to_string()),
persistent: true,
}
}
pub fn query_with<'q, DB, A>(sql: &str, arguments: A) -> Query<DB, A>
where
DB: Database,
A: IntoArguments<'q, DB>,
{
Query {
database: PhantomData,
arguments: Some(arguments),
statement: Either::Left(sql.to_string()),
persistent: true,
}
}
#[macro_export]
macro_rules! query {
($sql:expr) => {
$crate::query($sql)
};
($sql:expr,$($arg:tt$(,)?)*) => {
{
let mut q = $crate::query($sql);
$(q = q.bind($arg);)*
q
}
};
($sql:expr; $vec:expr) => {
{
let mut q = $crate::query($sql);
for x in $vec{
q = q.bind(x);
}
q
}
};
}