1use crate::{
2 AsQuery, Driver, DynQuery, Entity, Error, Query, QueryResult, RawQuery, Result, Row,
3 RowsAffected,
4 stream::{Stream, StreamExt, TryStreamExt},
5 writer::SqlWriter,
6};
7use convert_case::{Case, Casing};
8use std::{
9 future::{self, Future},
10 mem,
11};
12
13pub trait Executor: Send {
17 type Driver: Driver;
19
20 fn accepts_multiple_statements(&self) -> bool {
22 true
23 }
24
25 fn driver(&self) -> Self::Driver {
27 Default::default()
28 }
29
30 fn prepare<'s>(
32 &'s mut self,
33 query: impl AsQuery<Self::Driver> + 's,
34 ) -> impl Future<Output = Result<Query<Self::Driver>>> + Send {
35 let mut query = query.as_query();
36 let query = mem::take(query.as_mut());
37 async {
38 match query {
39 Query::Raw(RawQuery(sql)) => self.do_prepare(sql).await,
40 Query::Prepared(..) => Ok(query),
41 }
42 }
43 }
44
45 fn do_prepare(
47 &mut self,
48 _sql: String,
49 ) -> impl Future<Output = Result<Query<Self::Driver>>> + Send {
50 future::ready(Err(Error::msg(format!(
51 "{} does not support prepare",
52 self.driver().name().to_case(Case::Pascal)
53 ))))
54 }
55
56 fn run<'s>(
58 &'s mut self,
59 query: impl AsQuery<Self::Driver> + 's,
60 ) -> impl Stream<Item = Result<QueryResult>> + Send;
61
62 fn fetch<'s>(
64 &'s mut self,
65 query: impl AsQuery<Self::Driver> + 's,
66 ) -> impl Stream<Item = Result<Row>> + Send {
67 self.run(query).filter_map(|v| async move {
68 match v {
69 Ok(QueryResult::Row(v)) => Some(Ok(v)),
70 Err(e) => Some(Err(e)),
71 _ => None,
72 }
73 })
74 }
75
76 fn execute<'s>(
78 &'s mut self,
79 query: impl AsQuery<Self::Driver> + 's,
80 ) -> impl Future<Output = Result<RowsAffected>> + Send {
81 self.run(query)
82 .filter_map(|v| async move {
83 match v {
84 Ok(QueryResult::Affected(v)) => Some(Ok(v)),
85 Err(e) => Some(Err(e)),
86 _ => None,
87 }
88 })
89 .try_collect()
90 }
91
92 fn append<'a, E, It>(
94 &mut self,
95 entities: It,
96 ) -> impl Future<Output = Result<RowsAffected>> + Send
97 where
98 E: Entity + 'a,
99 It: IntoIterator<Item = &'a E> + Send,
100 <It as IntoIterator>::IntoIter: Send,
101 {
102 let mut query = DynQuery::default();
103 self.driver()
104 .sql_writer()
105 .write_insert(&mut query, entities, false);
106 self.execute(query)
107 }
108}