1use crate::{
2 AsQuery, Driver, DynQuery, Entity, Error, Query, QueryResult, RawQuery, Result, RowLabeled,
3 RowsAffected,
4 stream::{Stream, StreamExt, TryStreamExt},
5 writer::SqlWriter,
6};
7use convert_case::{Case, Casing};
8use std::{
9 future::{self, Future},
10 mem,
11};
12
13pub trait Executor: Send + Sized {
17 type Driver: Driver;
19
20 fn accepts_multiple_statements(&self) -> bool {
22 true
23 }
24
25 fn driver(&self) -> Self::Driver
27 where
28 Self: Sized,
29 {
30 Default::default()
31 }
32
33 fn prepare<'s>(
35 &'s mut self,
36 query: impl AsQuery<Self::Driver> + 's,
37 ) -> impl Future<Output = Result<Query<Self::Driver>>> + Send {
38 let mut query = query.as_query();
39 let query = mem::take(query.as_mut());
40 async {
41 match query {
42 Query::Raw(RawQuery(sql)) => self.do_prepare(sql).await,
43 Query::Prepared(..) => Ok(query),
44 }
45 }
46 }
47
48 fn do_prepare(
50 &mut self,
51 _sql: String,
52 ) -> impl Future<Output = Result<Query<Self::Driver>>> + Send {
53 future::ready(Err(Error::msg(format!(
54 "{} does not support prepare",
55 self.driver().name().to_case(Case::Pascal)
56 ))))
57 }
58
59 fn run<'s>(
61 &'s mut self,
62 query: impl AsQuery<Self::Driver> + 's,
63 ) -> impl Stream<Item = Result<QueryResult>> + Send;
64
65 fn fetch<'s>(
67 &'s mut self,
68 query: impl AsQuery<Self::Driver> + 's,
69 ) -> impl Stream<Item = Result<RowLabeled>> + Send {
70 self.run(query).filter_map(|v| async move {
71 match v {
72 Ok(QueryResult::Row(v)) => Some(Ok(v)),
73 Err(e) => Some(Err(e)),
74 _ => None,
75 }
76 })
77 }
78
79 fn execute<'s>(
81 &'s mut self,
82 query: impl AsQuery<Self::Driver> + 's,
83 ) -> impl Future<Output = Result<RowsAffected>> + Send {
84 self.run(query)
85 .filter_map(|v| async move {
86 match v {
87 Ok(QueryResult::Affected(v)) => Some(Ok(v)),
88 Err(e) => Some(Err(e)),
89 _ => None,
90 }
91 })
92 .try_collect()
93 }
94
95 fn append<'a, E, It>(
97 &mut self,
98 entities: It,
99 ) -> impl Future<Output = Result<RowsAffected>> + Send
100 where
101 E: Entity + 'a,
102 It: IntoIterator<Item = &'a E> + Send,
103 <It as IntoIterator>::IntoIter: Send,
104 {
105 let mut query = DynQuery::default();
106 self.driver()
107 .sql_writer()
108 .write_insert(&mut query, entities, false);
109 self.execute(query)
110 }
111}