use crate::{
AsQuery, Driver, DynQuery, Entity, Error, Query, QueryResult, RawQuery, Result, Row,
RowsAffected,
stream::{Stream, StreamExt, TryStreamExt},
writer::SqlWriter,
};
use convert_case::{Case, Casing};
use std::{
future::{self, Future},
mem,
};
pub trait Executor: Send {
type Driver: Driver;
fn accepts_multiple_statements(&self) -> bool {
true
}
fn driver(&self) -> Self::Driver {
Default::default()
}
fn prepare<'s>(
&'s mut self,
query: impl AsQuery<Self::Driver> + 's,
) -> impl Future<Output = Result<Query<Self::Driver>>> + Send {
let mut query = query.as_query();
let query = mem::take(query.as_mut());
async {
match query {
Query::Raw(RawQuery(sql)) => self.do_prepare(sql).await,
Query::Prepared(..) => Ok(query),
}
}
}
fn do_prepare(
&mut self,
_sql: String,
) -> impl Future<Output = Result<Query<Self::Driver>>> + Send {
future::ready(Err(Error::msg(format!(
"{} does not support prepare",
self.driver().name().to_case(Case::Pascal)
))))
}
fn run<'s>(
&'s mut self,
query: impl AsQuery<Self::Driver> + 's,
) -> impl Stream<Item = Result<QueryResult>> + Send;
fn fetch<'s>(
&'s mut self,
query: impl AsQuery<Self::Driver> + 's,
) -> impl Stream<Item = Result<Row>> + Send {
self.run(query).filter_map(|v| async move {
match v {
Ok(QueryResult::Row(v)) => Some(Ok(v)),
Err(e) => Some(Err(e)),
_ => None,
}
})
}
fn execute<'s>(
&'s mut self,
query: impl AsQuery<Self::Driver> + 's,
) -> impl Future<Output = Result<RowsAffected>> + Send {
self.run(query)
.filter_map(|v| async move {
match v {
Ok(QueryResult::Affected(v)) => Some(Ok(v)),
Err(e) => Some(Err(e)),
_ => None,
}
})
.try_collect()
}
fn append<'a, E, It>(
&mut self,
entities: It,
) -> impl Future<Output = Result<RowsAffected>> + Send
where
E: Entity + 'a,
It: IntoIterator<Item = &'a E> + Send,
<It as IntoIterator>::IntoIter: Send,
{
let mut query = DynQuery::default();
self.driver()
.sql_writer()
.write_insert(&mut query, entities, false);
self.execute(query)
}
}