Skip to main content

tank_core/
executor.rs

1use crate::{
2    AsQuery, Driver, DynQuery, Entity, Error, Query, QueryResult, RawQuery, Result, Row,
3    RowsAffected,
4    stream::{Stream, StreamExt, TryStreamExt},
5    writer::SqlWriter,
6};
7use convert_case::{Case, Casing};
8use std::{
9    future::{self, Future},
10    mem,
11};
12
13/// Async query execution.
14///
15/// Implemented by connections.
16pub trait Executor: Send {
17    /// Associated driver.
18    type Driver: Driver;
19
20    /// Checks if the driver supports multiple SQL statements in a single request.
21    fn accepts_multiple_statements(&self) -> bool {
22        true
23    }
24
25    /// Returns the driver instance associated with this executor.
26    fn driver(&self) -> Self::Driver {
27        Default::default()
28    }
29
30    /// Prepares a query for execution, returning a handle to the prepared statement.
31    fn prepare<'s>(
32        &'s mut self,
33        query: impl AsQuery<Self::Driver> + 's,
34    ) -> impl Future<Output = Result<Query<Self::Driver>>> + Send {
35        let mut query = query.as_query();
36        let query = mem::take(query.as_mut());
37        async {
38            match query {
39                Query::Raw(RawQuery(sql)) => self.do_prepare(sql).await,
40                Query::Prepared(..) => Ok(query),
41            }
42        }
43    }
44
45    /// Internal hook for implementing prepared statement support.
46    fn do_prepare(
47        &mut self,
48        _sql: String,
49    ) -> impl Future<Output = Result<Query<Self::Driver>>> + Send {
50        future::ready(Err(Error::msg(format!(
51            "{} does not support prepare",
52            self.driver().name().to_case(Case::Pascal)
53        ))))
54    }
55
56    /// Executes a query and streams the results (rows or affected counts).
57    fn run<'s>(
58        &'s mut self,
59        query: impl AsQuery<Self::Driver> + 's,
60    ) -> impl Stream<Item = Result<QueryResult>> + Send;
61
62    /// Executes a query and streams the resulting rows, ignoring affected counts.
63    fn fetch<'s>(
64        &'s mut self,
65        query: impl AsQuery<Self::Driver> + 's,
66    ) -> impl Stream<Item = Result<Row>> + Send {
67        self.run(query).filter_map(|v| async move {
68            match v {
69                Ok(QueryResult::Row(v)) => Some(Ok(v)),
70                Err(e) => Some(Err(e)),
71                _ => None,
72            }
73        })
74    }
75
76    /// Executes a query and returns the total number of affected rows.
77    fn execute<'s>(
78        &'s mut self,
79        query: impl AsQuery<Self::Driver> + 's,
80    ) -> impl Future<Output = Result<RowsAffected>> + Send {
81        self.run(query)
82            .filter_map(|v| async move {
83                match v {
84                    Ok(QueryResult::Affected(v)) => Some(Ok(v)),
85                    Err(e) => Some(Err(e)),
86                    _ => None,
87                }
88            })
89            .try_collect()
90    }
91
92    /// Efficiently inserts a collection of entities bypassing regular SQL execution when supported by the driver.
93    fn append<'a, E, It>(
94        &mut self,
95        entities: It,
96    ) -> impl Future<Output = Result<RowsAffected>> + Send
97    where
98        E: Entity + 'a,
99        It: IntoIterator<Item = &'a E> + Send,
100        <It as IntoIterator>::IntoIter: Send,
101    {
102        let mut query = DynQuery::default();
103        self.driver()
104            .sql_writer()
105            .write_insert(&mut query, entities, false);
106        self.execute(query)
107    }
108}