1use crate::{
2 AsQuery, Driver, DynQuery, Entity, Query, QueryResult, RawQuery, Result, RowLabeled,
3 RowsAffected,
4 stream::{Stream, StreamExt, TryStreamExt},
5 writer::SqlWriter,
6};
7use std::{future::Future, mem};
8
9pub trait Executor: Send + Sized {
18 type Driver: Driver;
20
21 fn accepts_multiple_statements(&self) -> bool {
24 true
25 }
26
27 fn driver(&self) -> Self::Driver
33 where
34 Self: Sized,
35 {
36 Default::default()
37 }
38
39 fn prepare<'s>(
41 &'s mut self,
42 query: impl AsQuery<Self::Driver> + 's,
43 ) -> impl Future<Output = Result<Query<Self::Driver>>> + Send {
44 let mut query = query.as_query();
45 let query = mem::take(query.as_mut());
46 async {
47 match query {
48 Query::Raw(RawQuery(sql)) => self.do_prepare(sql).await,
49 Query::Prepared(..) => Ok(query),
50 }
51 }
52 }
53
54 fn do_prepare(
56 &mut self,
57 sql: String,
58 ) -> impl Future<Output = Result<Query<Self::Driver>>> + Send;
59
60 fn run<'s>(
62 &'s mut self,
63 query: impl AsQuery<Self::Driver> + 's,
64 ) -> impl Stream<Item = Result<QueryResult>> + Send;
65
66 fn fetch<'s>(
68 &'s mut self,
69 query: impl AsQuery<Self::Driver> + 's,
70 ) -> impl Stream<Item = Result<RowLabeled>> + Send {
71 self.run(query).filter_map(|v| async move {
72 match v {
73 Ok(QueryResult::Row(v)) => Some(Ok(v)),
74 Err(e) => Some(Err(e)),
75 _ => None,
76 }
77 })
78 }
79
80 fn execute<'s>(
82 &'s mut self,
83 query: impl AsQuery<Self::Driver> + 's,
84 ) -> impl Future<Output = Result<RowsAffected>> + Send {
85 self.run(query)
86 .filter_map(|v| async move {
87 match v {
88 Ok(QueryResult::Affected(v)) => Some(Ok(v)),
89 Err(e) => Some(Err(e)),
90 _ => None,
91 }
92 })
93 .try_collect()
94 }
95
96 fn append<'a, E, It>(
98 &mut self,
99 entities: It,
100 ) -> impl Future<Output = Result<RowsAffected>> + Send
101 where
102 E: Entity + 'a,
103 It: IntoIterator<Item = &'a E> + Send,
104 <It as IntoIterator>::IntoIter: Send,
105 {
106 let mut query = DynQuery::default();
107 self.driver()
108 .sql_writer()
109 .write_insert(&mut query, entities, false);
110 self.execute(query)
111 }
112}