use std::future;
use std::future::{ready, Ready};
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use futures_core::stream;
use rorm_sql::value::Value;
use rorm_sql::DBImpl;
use tracing::debug;
use crate::executor::{
AffectedRows, All, DynamicExecutor, Executor, Nothing, One, Optional, QueryStrategy,
QueryStrategyResult, Stream,
};
use crate::internal::any::{AnyExecutor, AnyPool, AnyQueryResult, AnyRow, AnyTransaction};
use crate::transaction::{Transaction, TransactionGuard};
use crate::{Database, Error, Row};
impl<'executor> Executor<'executor> for &'executor mut Transaction {
fn execute<'data, 'result, Q>(
self,
query: String,
values: Vec<Value<'data>>,
) -> Q::Result<'result>
where
'executor: 'result,
'data: 'result,
Q: QueryStrategy,
{
debug!(
target: "rorm_db::executor",
sql = query,
values.len = values.len(),
"Executing statement"
);
Q::execute(&mut self.0, query, values)
}
fn into_dyn(self) -> DynamicExecutor<'executor> {
DynamicExecutor::Transaction(self)
}
fn dialect(&self) -> DBImpl {
match self.0 {
#[cfg(feature = "postgres")]
AnyTransaction::Postgres(_) => DBImpl::Postgres,
#[cfg(feature = "mysql")]
AnyTransaction::MySql(_) => DBImpl::MySQL,
#[cfg(feature = "sqlite")]
AnyTransaction::Sqlite(_) => DBImpl::SQLite,
}
}
type EnsureTransactionFuture = Ready<Result<TransactionGuard<'executor>, Error>>;
fn ensure_transaction(
self,
) -> BoxFuture<'executor, Result<TransactionGuard<'executor>, Error>> {
Box::pin(ready(Ok(TransactionGuard::Borrowed(self))))
}
}
impl<'executor> Executor<'executor> for &'executor Database {
fn execute<'data, 'result, Q>(
self,
query: String,
values: Vec<Value<'data>>,
) -> Q::Result<'result>
where
'executor: 'result,
'data: 'result,
Q: QueryStrategy,
{
debug!(
target: "rorm_db::executor",
sql = query,
values.len = values.len(),
"Executing statement"
);
Q::execute(&self.0, query, values)
}
fn into_dyn(self) -> DynamicExecutor<'executor> {
DynamicExecutor::Database(self)
}
fn dialect(&self) -> DBImpl {
match self.0 {
#[cfg(feature = "postgres")]
AnyPool::Postgres(_) => DBImpl::Postgres,
#[cfg(feature = "mysql")]
AnyPool::MySql(_) => DBImpl::MySQL,
#[cfg(feature = "sqlite")]
AnyPool::Sqlite(_) => DBImpl::SQLite,
}
}
type EnsureTransactionFuture = BoxFuture<'executor, Result<TransactionGuard<'executor>, Error>>;
fn ensure_transaction(
self,
) -> BoxFuture<'executor, Result<TransactionGuard<'executor>, Error>> {
Box::pin(async move { self.start_transaction().await.map(TransactionGuard::Owned) })
}
}
pub trait QueryStrategyImpl: QueryStrategyResult {
fn execute<'query, E>(
executor: E,
query: String,
values: Vec<Value<'query>>,
) -> Self::Result<'query>
where
E: AnyExecutor<'query>;
}
pub type QueryFuture<T> = QueryWrapper<T>;
pub type QueryStream<T> = QueryWrapper<T>;
pub use query_wrapper::QueryWrapper;
use crate::futures_util::{BoxFuture, BoxStream};
mod query_wrapper {
use std::pin::Pin;
use rorm_sql::value::Value;
use crate::internal::any::{AnyExecutor, AnyQuery};
#[doc(hidden)]
#[pin_project::pin_project]
pub struct QueryWrapper<T> {
#[pin]
wrapped: T,
#[allow(dead_code)] query_string: String,
}
impl<'query, T: 'query> QueryWrapper<T> {
pub(crate) fn new_basic(string: String, wrapped: impl FnOnce(&'query str) -> T) -> Self {
let slice: &str = string.as_str();
let slice: &'query str = unsafe { std::mem::transmute(slice) };
Self {
query_string: string,
wrapped: wrapped(slice),
}
}
pub fn new<'data: 'query>(
executor: impl AnyExecutor<'query>,
query_string: String,
values: Vec<Value<'data>>,
execute: impl FnOnce(AnyQuery<'query>) -> T,
) -> Self {
Self::new_basic(query_string, move |query_string| {
let mut query = executor.query(query_string);
for value in values {
crate::internal::utils::bind_param(&mut query, value);
}
execute(query)
})
}
}
impl<T> QueryWrapper<T> {
pub fn project_wrapped(self: Pin<&mut Self>) -> Pin<&mut T> {
self.project().wrapped
}
}
}
impl<F> future::Future for QueryFuture<F>
where
F: future::Future,
{
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project_wrapped().poll(cx)
}
}
impl<S> stream::Stream for QueryStream<S>
where
S: stream::Stream,
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project_wrapped().poll_next(cx)
}
}
impl QueryStrategyResult for Nothing {
type Result<'query> = QueryFuture<NothingFuture<'query>>;
}
impl QueryStrategyImpl for Nothing {
fn execute<'query, E>(
executor: E,
query: String,
values: Vec<Value<'query>>,
) -> Self::Result<'query>
where
E: AnyExecutor<'query>,
{
QueryFuture::new(executor, query, values, |query| NothingFuture {
stream: query.fetch_many(),
})
}
}
pub struct NothingFuture<'stream> {
stream: BoxStream<'stream, sqlx::Result<sqlx::Either<AnyQueryResult, AnyRow>>>,
}
impl future::Future for NothingFuture<'_> {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
return Poll::Ready(match ready!(self.stream.as_mut().poll_next(cx)) {
None => Ok(()),
Some(Err(error)) => Err(error.into()),
Some(_either) => continue,
});
}
}
}
impl QueryStrategyResult for AffectedRows {
type Result<'query> = QueryFuture<BoxFuture<'query, Result<u64, Error>>>;
}
impl QueryStrategyImpl for AffectedRows {
fn execute<'query, E>(
executor: E,
query: String,
values: Vec<Value<'query>>,
) -> Self::Result<'query>
where
E: AnyExecutor<'query>,
{
QueryFuture::new(executor, query, values, |query| {
Box::pin(async move { Ok(query.fetch_affected_rows().await?) }) as BoxFuture<_>
})
}
}
impl QueryStrategyResult for One {
type Result<'query> = QueryFuture<BoxFuture<'query, Result<Row, Error>>>;
}
impl QueryStrategyImpl for One {
fn execute<'query, E>(
executor: E,
query: String,
values: Vec<Value<'query>>,
) -> Self::Result<'query>
where
E: AnyExecutor<'query>,
{
QueryFuture::new(executor, query, values, |query| {
Box::pin(async move {
Ok(Row(query
.fetch_optional()
.await?
.ok_or(sqlx::Error::RowNotFound)?))
}) as BoxFuture<_>
})
}
}
impl QueryStrategyResult for Optional {
type Result<'query> = QueryFuture<BoxFuture<'query, Result<Option<Row>, Error>>>;
}
impl QueryStrategyImpl for Optional {
fn execute<'query, E>(
executor: E,
query: String,
values: Vec<Value<'query>>,
) -> Self::Result<'query>
where
E: AnyExecutor<'query>,
{
QueryFuture::new(executor, query, values, |query| {
Box::pin(async move { Ok(query.fetch_optional().await?.map(Row)) }) as BoxFuture<_>
})
}
}
impl QueryStrategyResult for All {
type Result<'query> = QueryFuture<BoxFuture<'query, Result<Vec<Row>, Error>>>;
}
impl QueryStrategyImpl for All {
fn execute<'query, E>(
executor: E,
query: String,
values: Vec<Value<'query>>,
) -> Self::Result<'query>
where
E: AnyExecutor<'query>,
{
QueryFuture::new(executor, query, values, |query| {
Box::pin(async move { Ok(query.fetch_all().await?.into_iter().map(Row).collect()) })
as BoxFuture<_>
})
}
}
impl QueryStrategyResult for Stream {
type Result<'query> = QueryStream<StreamResult<'query>>;
}
impl QueryStrategyImpl for Stream {
fn execute<'query, E>(
executor: E,
query: String,
values: Vec<Value<'query>>,
) -> Self::Result<'query>
where
E: AnyExecutor<'query>,
{
QueryStream::new(executor, query, values, |query| StreamResult {
stream: query.fetch_many(),
})
}
}
pub struct StreamResult<'stream> {
stream: BoxStream<'stream, sqlx::Result<sqlx::Either<AnyQueryResult, AnyRow>>>,
}
impl Unpin for StreamResult<'_> {}
impl stream::Stream for StreamResult<'_> {
type Item = Result<Row, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
return Poll::Ready(match ready!(self.stream.as_mut().poll_next(cx)) {
None => None,
Some(Err(error)) => Some(Err(error.into())),
Some(Ok(sqlx::Either::Right(row))) => Some(Ok(Row(row))),
Some(Ok(sqlx::Either::Left(_result))) => continue,
});
}
}
}
#[cfg(test)]
mod test {
use crate::internal::executor::QueryWrapper;
#[test]
fn test_drop_order() {
struct BorrowStr<'a>(&'a str);
impl<'a> Drop for BorrowStr<'a> {
fn drop(&mut self) {
println!("{}", self.0);
}
}
let _w = QueryWrapper::new_basic(format!("Hello World"), BorrowStr);
}
}