Skip to main content

tank_core/
executor.rs

1use crate::{
2    AsQuery, Driver, DynQuery, Entity, Error, Query, QueryResult, RawQuery, Result, RowLabeled,
3    RowsAffected,
4    stream::{Stream, StreamExt, TryStreamExt},
5    writer::SqlWriter,
6};
7use convert_case::{Case, Casing};
8use std::{
9    future::{self, Future},
10    mem,
11};
12
13/// Async query execution.
14///
15/// Implemented by connections.
16pub trait Executor: Send + Sized {
17    /// Associated driver.
18    type Driver: Driver;
19
20    /// Supports multiple statements per request.
21    fn accepts_multiple_statements(&self) -> bool {
22        true
23    }
24
25    /// Get the driver instance.
26    fn driver(&self) -> Self::Driver
27    where
28        Self: Sized,
29    {
30        Default::default()
31    }
32
33    /// Prepare query.
34    fn prepare<'s>(
35        &'s mut self,
36        query: impl AsQuery<Self::Driver> + 's,
37    ) -> impl Future<Output = Result<Query<Self::Driver>>> + Send {
38        let mut query = query.as_query();
39        let query = mem::take(query.as_mut());
40        async {
41            match query {
42                Query::Raw(RawQuery(sql)) => self.do_prepare(sql).await,
43                Query::Prepared(..) => Ok(query),
44            }
45        }
46    }
47
48    /// Actual implementation for `prepare`.
49    fn do_prepare(
50        &mut self,
51        _sql: String,
52    ) -> impl Future<Output = Result<Query<Self::Driver>>> + Send {
53        future::ready(Err(Error::msg(format!(
54            "{} does not support prepare",
55            self.driver().name().to_case(Case::Pascal)
56        ))))
57    }
58
59    /// Execute a query, streaming `QueryResult` (rows or affected counts).
60    fn run<'s>(
61        &'s mut self,
62        query: impl AsQuery<Self::Driver> + 's,
63    ) -> impl Stream<Item = Result<QueryResult>> + Send;
64
65    /// Execute a query yielding `RowLabeled` from the resulting stream (filtering out `RowsAffected`).
66    fn fetch<'s>(
67        &'s mut self,
68        query: impl AsQuery<Self::Driver> + 's,
69    ) -> impl Stream<Item = Result<RowLabeled>> + Send {
70        self.run(query).filter_map(|v| async move {
71            match v {
72                Ok(QueryResult::Row(v)) => Some(Ok(v)),
73                Err(e) => Some(Err(e)),
74                _ => None,
75            }
76        })
77    }
78
79    /// Execute and aggregate affected rows counter.
80    fn execute<'s>(
81        &'s mut self,
82        query: impl AsQuery<Self::Driver> + 's,
83    ) -> impl Future<Output = Result<RowsAffected>> + Send {
84        self.run(query)
85            .filter_map(|v| async move {
86                match v {
87                    Ok(QueryResult::Affected(v)) => Some(Ok(v)),
88                    Err(e) => Some(Err(e)),
89                    _ => None,
90                }
91            })
92            .try_collect()
93    }
94
95    /// Insert many entities efficiently.
96    fn append<'a, E, It>(
97        &mut self,
98        entities: It,
99    ) -> impl Future<Output = Result<RowsAffected>> + Send
100    where
101        E: Entity + 'a,
102        It: IntoIterator<Item = &'a E> + Send,
103        <It as IntoIterator>::IntoIter: Send,
104    {
105        let mut query = DynQuery::default();
106        self.driver()
107            .sql_writer()
108            .write_insert(&mut query, entities, false);
109        self.execute(query)
110    }
111}