use diesel::{
AsChangeset, associations::HasTable, query_builder::IntoUpdateTarget, result::QueryResult,
};
use futures_util::{Future, Stream, StreamExt, TryFutureExt, TryStreamExt, pin_mut};
pub use methods::{ExecuteDsl, LoadQuery};
use crate::AsyncExecute;
pub mod methods {
use diesel::{
backend::Backend,
deserialize::FromSqlRow,
expression::QueryMetadata,
query_builder::{AsQuery, QueryFragment, QueryId},
query_dsl::CompatibleType,
};
use futures_util::{Stream, StreamExt};
use super::*;
use crate::AsyncExecute;
pub trait ExecuteDsl<Conn, DB = <Conn as AsyncExecute>::Backend>
where
Conn: AsyncExecute<Backend = DB>,
DB: Backend,
{
fn execute(query: Self, conn: &mut Conn)
-> impl Future<Output = QueryResult<usize>> + Send;
}
impl<Conn, DB, T> ExecuteDsl<Conn, DB> for T
where
Conn: AsyncExecute<Backend = DB>,
DB: Backend,
T: QueryFragment<DB> + QueryId + Send,
{
async fn execute(query: Self, conn: &mut Conn) -> QueryResult<usize> {
conn.execute_returning_count(query).await
}
}
pub trait LoadQuery<Conn: AsyncExecute, U> {
fn internal_load(
self,
conn: &mut Conn,
) -> impl Future<Output = QueryResult<impl Stream<Item = QueryResult<U>> + Send>> + Send;
}
impl<Conn, DB, T, U, ST> LoadQuery<Conn, U> for T
where
Conn: AsyncExecute<Backend = DB>,
U: FromSqlRow<ST, DB> + Send,
DB: Backend + 'static,
T: AsQuery + Send,
T::Query: QueryFragment<DB> + QueryId + Send,
T::SqlType: CompatibleType<U, DB, SqlType = ST>,
U: FromSqlRow<ST, DB> + Send + 'static,
DB: QueryMetadata<T::SqlType>,
ST: 'static,
{
async fn internal_load(
self,
conn: &mut Conn,
) -> QueryResult<impl Stream<Item = QueryResult<U>>> {
let result = conn.load(self);
result
.map_ok(|stream| {
stream.map(|row| {
U::build_from_row(&row?)
.map_err(diesel::result::Error::DeserializationError)
})
})
.await
}
}
}
pub trait RunQueryDsl<Conn>: Sized + Send {
fn execute(self, conn: &mut Conn) -> impl Future<Output = QueryResult<usize>> + Send
where
Conn: AsyncExecute + Send,
Self: methods::ExecuteDsl<Conn>,
{
methods::ExecuteDsl::execute(self, conn)
}
fn load<U>(self, conn: &mut Conn) -> impl Future<Output = QueryResult<Vec<U>>> + Send
where
U: Send,
Conn: AsyncExecute,
Self: methods::LoadQuery<Conn, U>,
{
self.load_stream(conn).and_then(TryStreamExt::try_collect)
}
fn load_stream<U>(
self,
conn: &mut Conn,
) -> impl Future<Output = QueryResult<impl Stream<Item = QueryResult<U>> + Send>> + Send
where
Conn: AsyncExecute,
Self: methods::LoadQuery<Conn, U>,
{
self.internal_load(conn)
}
fn get_result<U>(self, conn: &mut Conn) -> impl Future<Output = QueryResult<U>> + Send
where
U: Send,
Conn: AsyncExecute,
Self: methods::LoadQuery<Conn, U>,
{
async {
let stream = self.load_stream(conn).await?;
pin_mut!(stream);
stream
.next()
.await
.unwrap_or(Err(diesel::result::Error::NotFound))
}
}
fn get_results<U>(self, conn: &mut Conn) -> impl Future<Output = QueryResult<Vec<U>>> + Send
where
U: Send,
Conn: AsyncExecute,
Self: methods::LoadQuery<Conn, U>,
{
self.load(conn)
}
fn first<U>(self, conn: &mut Conn) -> impl Future<Output = QueryResult<U>> + Send
where
U: Send,
Conn: AsyncExecute,
Self: diesel::query_dsl::methods::LimitDsl,
diesel::dsl::Limit<Self>: methods::LoadQuery<Conn, U> + Send,
{
diesel::query_dsl::methods::LimitDsl::limit(self, 1).get_result(conn)
}
}
impl<T: Send, Conn> RunQueryDsl<Conn> for T {}
pub trait SaveChangesDsl<Conn> {
fn save_changes<T>(self, connection: &mut Conn) -> impl Future<Output = QueryResult<T>> + Send
where
Self: Sized + diesel::prelude::Identifiable,
Conn: UpdateAndFetchResults<Self, T>,
{
connection.update_and_fetch(self)
}
}
impl<T, Conn> SaveChangesDsl<Conn> for T where
T: Copy + AsChangeset<Target = <T as HasTable>::Table> + IntoUpdateTarget
{
}
pub trait UpdateAndFetchResults<Changes, Output>: AsyncExecute
where
Changes: diesel::prelude::Identifiable + HasTable,
{
fn update_and_fetch(
&mut self,
changeset: Changes,
) -> impl Future<Output = QueryResult<Output>> + Send;
}