Skip to main content

tank_core/
executor.rs

1use crate::{
2    AsQuery, Driver, DynQuery, Entity, Query, QueryResult, RawQuery, Result, RowLabeled,
3    RowsAffected,
4    stream::{Stream, StreamExt, TryStreamExt},
5    writer::SqlWriter,
6};
7use std::{future::Future, mem};
8
9/// Async query execution.
10///
11/// Implemented by connections.
12pub trait Executor: Send + Sized {
13    /// Associated driver.
14    type Driver: Driver;
15
16    /// Supports multiple statements per request.
17    fn accepts_multiple_statements(&self) -> bool {
18        true
19    }
20
21    /// Get the driver instance.
22    fn driver(&self) -> Self::Driver
23    where
24        Self: Sized,
25    {
26        Default::default()
27    }
28
29    /// Prepare query.
30    fn prepare<'s>(
31        &'s mut self,
32        query: impl AsQuery<Self::Driver> + 's,
33    ) -> impl Future<Output = Result<Query<Self::Driver>>> + Send {
34        let mut query = query.as_query();
35        let query = mem::take(query.as_mut());
36        async {
37            match query {
38                Query::Raw(RawQuery(sql)) => self.do_prepare(sql).await,
39                Query::Prepared(..) => Ok(query),
40            }
41        }
42    }
43
44    /// Actual implementation for `prepare`.
45    fn do_prepare(
46        &mut self,
47        sql: String,
48    ) -> impl Future<Output = Result<Query<Self::Driver>>> + Send;
49
50    /// Execute a query, streaming `QueryResult` (rows or affected counts).
51    fn run<'s>(
52        &'s mut self,
53        query: impl AsQuery<Self::Driver> + 's,
54    ) -> impl Stream<Item = Result<QueryResult>> + Send;
55
56    /// Execute a query yielding `RowLabeled` from the resulting stream (filtering out `RowsAffected`).
57    fn fetch<'s>(
58        &'s mut self,
59        query: impl AsQuery<Self::Driver> + 's,
60    ) -> impl Stream<Item = Result<RowLabeled>> + Send {
61        self.run(query).filter_map(|v| async move {
62            match v {
63                Ok(QueryResult::Row(v)) => Some(Ok(v)),
64                Err(e) => Some(Err(e)),
65                _ => None,
66            }
67        })
68    }
69
70    /// Execute and aggregate affected rows counter.
71    fn execute<'s>(
72        &'s mut self,
73        query: impl AsQuery<Self::Driver> + 's,
74    ) -> impl Future<Output = Result<RowsAffected>> + Send {
75        self.run(query)
76            .filter_map(|v| async move {
77                match v {
78                    Ok(QueryResult::Affected(v)) => Some(Ok(v)),
79                    Err(e) => Some(Err(e)),
80                    _ => None,
81                }
82            })
83            .try_collect()
84    }
85
86    /// Insert many entities efficiently.
87    fn append<'a, E, It>(
88        &mut self,
89        entities: It,
90    ) -> impl Future<Output = Result<RowsAffected>> + Send
91    where
92        E: Entity + 'a,
93        It: IntoIterator<Item = &'a E> + Send,
94        <It as IntoIterator>::IntoIter: Send,
95    {
96        let mut query = DynQuery::default();
97        self.driver()
98            .sql_writer()
99            .write_insert(&mut query, entities, false);
100        self.execute(query)
101    }
102}