Skip to main content

tank_core/
executor.rs

1use crate::{
2    AsQuery, Driver, DynQuery, Entity, Query, QueryResult, RawQuery, Result, RowLabeled,
3    RowsAffected,
4    stream::{Stream, StreamExt, TryStreamExt},
5    writer::SqlWriter,
6};
7use std::{future::Future, mem};
8
9/// Async query executor bound to a concrete `Driver`.
10///
11/// Responsibilities:
12/// - Translate high-level operations into driver queries
13/// - Stream results without buffering the entire result set (if possible)
14/// - Provide ergonomic helpers for common patterns
15///
16/// Implementors typically wrap a connection or pooled handle.
17pub trait Executor: Send + Sized {
18    /// Associated driver.
19    type Driver: Driver;
20
21    /// Returns true if the executor accepts multiple SQL statements in a single
22    /// request (e.g. `CREATE; INSERT; SELECT`). Defaults to `true`.
23    fn accepts_multiple_statements(&self) -> bool {
24        true
25    }
26
27    /// Driver instance.
28    ///
29    /// Default implementation returns `Default::default()` for the associated
30    /// `Driver`. Executors that carry per-connection or pooled driver state
31    /// should override this method to return the appropriate driver instance.
32    fn driver(&self) -> Self::Driver
33    where
34        Self: Sized,
35    {
36        Default::default()
37    }
38
39    /// Prepare a query for later execution.
40    fn prepare<'s>(
41        &'s mut self,
42        query: impl AsQuery<Self::Driver> + 's,
43    ) -> impl Future<Output = Result<Query<Self::Driver>>> + Send {
44        let mut query = query.as_query();
45        let query = mem::take(query.as_mut());
46        async {
47            match query {
48                Query::Raw(RawQuery(sql)) => self.do_prepare(sql).await,
49                Query::Prepared(..) => Ok(query),
50            }
51        }
52    }
53
54    /// Prepare a query for later execution.
55    fn do_prepare(
56        &mut self,
57        sql: String,
58    ) -> impl Future<Output = Result<Query<Self::Driver>>> + Send;
59
60    /// Run a query, streaming `QueryResult` items.
61    fn run<'s>(
62        &'s mut self,
63        query: impl AsQuery<Self::Driver> + 's,
64    ) -> impl Stream<Item = Result<QueryResult>> + Send;
65
66    /// Stream only labeled rows (filters non-row results).
67    fn fetch<'s>(
68        &'s mut self,
69        query: impl AsQuery<Self::Driver> + 's,
70    ) -> impl Stream<Item = Result<RowLabeled>> + Send {
71        self.run(query).filter_map(|v| async move {
72            match v {
73                Ok(QueryResult::Row(v)) => Some(Ok(v)),
74                Err(e) => Some(Err(e)),
75                _ => None,
76            }
77        })
78    }
79
80    /// Execute and aggregate affected rows.
81    fn execute<'s>(
82        &'s mut self,
83        query: impl AsQuery<Self::Driver> + 's,
84    ) -> impl Future<Output = Result<RowsAffected>> + Send {
85        self.run(query)
86            .filter_map(|v| async move {
87                match v {
88                    Ok(QueryResult::Affected(v)) => Some(Ok(v)),
89                    Err(e) => Some(Err(e)),
90                    _ => None,
91                }
92            })
93            .try_collect()
94    }
95
96    /// Insert many entities efficiently.
97    fn append<'a, E, It>(
98        &mut self,
99        entities: It,
100    ) -> impl Future<Output = Result<RowsAffected>> + Send
101    where
102        E: Entity + 'a,
103        It: IntoIterator<Item = &'a E> + Send,
104        <It as IntoIterator>::IntoIter: Send,
105    {
106        let mut query = DynQuery::default();
107        self.driver()
108            .sql_writer()
109            .write_insert(&mut query, entities, false);
110        self.execute(query)
111    }
112}