mod delete;
mod select;
mod table;
mod transaction;
mod update;
pub use table::{Column, TypedColumn};
use std::{
future::{Future, IntoFuture},
marker::PhantomData,
ops::{BitAnd, BitOr, Not},
pin::Pin,
};
use async_trait::async_trait;
use tokio_postgres::{types::ToSql, Row, Transaction as PgTransaction};
use crate::{fetch_client, DpClient, DpTransaction, Error};
pub use delete::Delete;
pub use select::Select;
pub use transaction::*;
pub use update::{NoneSet, SomeSet, Update};
#[doc(hidden)]
pub trait PushChunk<'a> {
fn push_to_buffer<T>(&mut self, buffer: &mut Query<'a, T>);
}
#[doc(hidden)]
#[async_trait]
pub trait Executor {
async fn query(&self, stmt: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>, Error>;
async fn execute(&self, stmt: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64, Error>;
}
#[async_trait]
pub trait Executable {
type Output;
async fn exec(&self) -> Result<Self::Output, crate::Error> {
let client = fetch_client().await?;
self.exec_with(&client).await
}
async fn exec_with(
&self,
client: impl Executor + Send + Sync,
) -> Result<Self::Output, crate::Error>;
}
pub struct Query<'a, T = Vec<Row>>(pub String, Vec<&'a (dyn ToSql + Sync)>, PhantomData<T>);
pub trait ToQuery<'a, T>: PushChunk<'a> {
fn to_query(&mut self) -> Query<'a, T> {
let mut query = Query::default();
self.push_to_buffer(&mut query);
query.0 = replace_question_marks(query.0);
query
}
}
#[doc(hidden)]
pub struct SqlChunk<'a>(pub String, pub Vec<&'a (dyn ToSql + Sync)>);
fn push_all_with_sep<'a, T, U: PushChunk<'a>>(
vec: &mut Vec<U>,
buffer: &mut Query<'a, T>,
sep: &str,
) {
if vec.is_empty() {
return;
}
for i in vec {
i.push_to_buffer(buffer);
buffer.0.push_str(sep);
}
buffer.0.truncate(buffer.0.len() - sep.len());
}
pub enum Where<'a> {
And(Vec<Where<'a>>),
Or(Vec<Where<'a>>),
Not(Box<Where<'a>>),
Raw(SqlChunk<'a>),
Empty,
}
fn replace_question_marks(stmt: String) -> String {
const RESERVED: usize = 9;
let mut buf = String::with_capacity(stmt.len() + RESERVED);
let mut last_index = 0;
for (count, (i, _)) in stmt.match_indices('?').enumerate() {
buf.push_str(&stmt[last_index..i]);
buf.push('$');
buf.push_str(&(count + 1).to_string());
last_index = i + 1;
}
buf.push_str(&stmt[last_index..]);
buf
}
impl<'a, T> Default for Query<'a, T> {
fn default() -> Self {
Self("".into(), vec![], PhantomData::<T>)
}
}
impl<'a, T> Query<'a, T> {
pub fn new(stmt: String, params: Vec<&'a (dyn ToSql + Sync)>) -> Query<'a, T> {
Query(replace_question_marks(stmt), params, PhantomData::<T>)
}
}
impl<'a> PushChunk<'a> for SqlChunk<'a> {
fn push_to_buffer<T>(&mut self, buffer: &mut Query<'a, T>) {
buffer.0.push_str(&self.0);
buffer.1.append(&mut self.1);
}
}
#[async_trait]
impl<'a> Executor for &DpTransaction<'a> {
async fn query(&self, stmt: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>, Error> {
PgTransaction::query(self, stmt, params)
.await
.map_err(Error::from)
}
async fn execute(&self, stmt: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64, Error> {
PgTransaction::execute(self, stmt, params)
.await
.map_err(Error::from)
}
}
#[async_trait]
impl Executor for &DpClient {
async fn query(&self, stmt: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>, Error> {
(***self).query(stmt, params).await.map_err(Error::from)
}
async fn execute(&self, stmt: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64, Error> {
(***self).execute(stmt, params).await.map_err(Error::from)
}
}
#[async_trait]
impl<'a, T> Executable for Query<'a, Vec<T>>
where
T: TryFrom<Row, Error = crate::Error> + Send + Sync,
{
type Output = Vec<T>;
async fn exec_with(
&self,
client: impl Executor + Send + Sync,
) -> Result<Self::Output, crate::Error> {
let rows = client.query(&self.0, &self.1).await?;
rows.into_iter().map(|i| T::try_from(i)).collect()
}
}
#[async_trait]
impl<'a, T> Executable for Query<'a, Option<T>>
where
T: TryFrom<Row, Error = crate::Error> + Send + Sync,
{
type Output = Option<T>;
async fn exec_with(
&self,
client: impl Executor + Send + Sync,
) -> Result<Self::Output, crate::Error> {
let rows = client.query(&self.0, &self.1).await?;
rows.into_iter()
.map(|i: Row| T::try_from(i))
.next()
.transpose()
}
}
#[async_trait]
impl<'a> Executable for Query<'a, u64> {
type Output = u64;
async fn exec_with(
&self,
client: impl Executor + Send + Sync,
) -> Result<Self::Output, crate::Error> {
client
.execute(&self.0, &self.1)
.await
.map_err(crate::Error::from)
}
}
impl<'a, T: Send + 'a> IntoFuture for Query<'a, T>
where
Query<'a, T>: Executable<Output = T>,
T: Sync,
{
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + 'a>>;
type Output = Result<T, crate::Error>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move { self.exec().await })
}
}
impl<'a> Where<'a> {
pub(crate) fn new(expr: String, params: Vec<&'a (dyn ToSql + Sync)>) -> Where<'a> {
Self::Raw(SqlChunk(expr, params))
}
pub(crate) fn is_empty(&self) -> bool {
use Where::*;
match self {
Empty => true,
And(vec) => vec.iter().all(|i| i.is_empty()),
Or(vec) => vec.iter().all(|i| i.is_empty()),
Not(inner) => inner.is_empty(),
Raw(chunk) => chunk.0.is_empty(),
}
}
pub fn and(self, other: Where<'a>) -> Where<'a> {
self.bitand(other)
}
pub fn or(self, other: Where<'a>) -> Where<'a> {
self.bitor(other)
}
}
impl<'a> Default for Where<'a> {
fn default() -> Self {
Where::new("".into(), vec![])
}
}
impl<'a> BitAnd for Where<'a> {
type Output = Where<'a>;
fn bitand(mut self, mut other: Self) -> Self::Output {
use Where::*;
if let Empty = self {
return other;
}
if let Empty = other {
return self;
}
if let And(ref mut vec) = self {
if let And(ref mut other_vec) = other {
vec.append(other_vec);
} else {
vec.push(other);
}
return self;
}
if let And(ref mut vec) = other {
vec.push(self);
return other;
}
And(vec![self, other])
}
}
impl<'a> BitOr for Where<'a> {
type Output = Where<'a>;
fn bitor(mut self, mut other: Self) -> Self::Output {
use Where::*;
if let Empty = self {
return other;
}
if let Empty = other {
return self;
}
if let Or(ref mut vec) = self {
if let And(ref mut other_vec) = other {
vec.append(other_vec);
} else {
vec.push(other);
}
return self;
}
if let Or(ref mut vec) = other {
vec.push(self);
return other;
}
Or(vec![self, other])
}
}
impl<'a> Not for Where<'a> {
type Output = Where<'a>;
fn not(self) -> Self::Output {
use Where::*;
if let Not(inner) = self {
return *inner;
}
Not(Box::new(self))
}
}
impl<'a> PushChunk<'a> for Where<'a> {
fn push_to_buffer<T>(&mut self, buffer: &mut Query<'a, T>) {
use Where::*;
if self.is_empty() {
return;
}
match self {
Raw(chunk) => {
chunk.push_to_buffer(buffer);
}
Not(inner) => {
buffer.0.push_str("NOT (");
inner.push_to_buffer(buffer);
buffer.0.push(')');
}
And(vec) => {
buffer.0.push('(');
push_all_with_sep(vec, buffer, ") AND (");
buffer.0.push(')');
}
Or(vec) => {
buffer.0.push('(');
push_all_with_sep(vec, buffer, ") OR (");
buffer.0.push(')');
}
Empty => (),
}
}
}