tank_core/
executor.rs

1//! Execution abstraction wrapping a database `Driver`.
2//! NOTE: All returned Futures and Streams MUST be awaited / fully consumed.
3//! Some drivers may side-effect early, but for portability and correctness
4//! always await the future or exhaust the stream.
5//!
6//! The `Executor` trait provides a uniform, async/stream-based interface for:
7//! - Preparing parameterized queries (`prepare`)
8//! - Running arbitrary queries yielding heterogeneous results (`run`)
9//! - Convenience adapters to obtain only rows (`fetch`) or only affected counts (`execute`)
10//! - Bulk data ingestion (`append`), with graceful fallback for drivers lacking append semantics.
11//!
12//! Streams:
13//! `run` yields `QueryResult` items. Higher-level helpers (`fetch`, `execute`) filter & map only the
14//! variants they care about, propagating errors while discarding unrelated items.
15//!
16//! Lifetimes:
17//! `fetch` ties the stream lifetime `'s` to `&'s mut self`, ensuring the executor outlives row decoding.
18//!
19//! Fallbacks:
20//! `append` always emits an INSERT when the driver has no dedicated append/merge support.
21//!
22//! Awaiting:
23//! Every method returning a Future or Stream must be awaited / polled to completion. Some drivers
24//! might perform effects before awaiting, but relying on that is undefined behavior across drivers.
25#![doc = r#"
26The `Executor` trait provides a uniform, async/stream-based interface for:
27- Preparing parameterized queries (`prepare`)
28- Running arbitrary queries yielding heterogeneous results (`run`)
29- Convenience adapters to obtain only rows (`fetch`) or only affected counts (`execute`)
30- Bulk data ingestion (`append`), with graceful fallback for drivers lacking append semantics.
31
32Streams:
33`run` yields `QueryResult` items. Higher-level helpers (`fetch`, `execute`) filter & map only the
34variants they care about, propagating errors while discarding unrelated items.
35
36Lifetimes:
37`fetch` ties the stream lifetime `'s` to `&'s mut self`, ensuring the executor outlives row decoding.
38
39Fallbacks:
40`append` always emits an INSERT when the driver has no dedicated append/merge support.
41
42Awaiting:
43Every method returning a Future or Stream must be awaited / polled to completion. Some drivers
44might perform effects before awaiting, but relying on that is undefined behavior across drivers.
45"#]
46
47use crate::{
48    AsQuery, Driver, Entity, Query, QueryResult, Result, RowLabeled, RowsAffected,
49    stream::{Stream, StreamExt, TryStreamExt},
50    writer::SqlWriter,
51};
52use std::future::Future;
53
54/// Async query executor bound to a concrete `Driver`.
55///
56/// Responsibilities:
57/// - Translate high-level operations into driver queries
58/// - Stream results without buffering the entire result set (if possible)
59/// - Provide ergonomic helpers for common patterns
60///
61/// Implementors typically wrap a connection or pooled handle.
62pub trait Executor: Send + Sized {
63    /// Underlying driver type supplying SQL dialect + I/O.
64    type Driver: Driver;
65
66    /// Access the driver instance.
67    fn driver(&self) -> &Self::Driver;
68
69    /// Prepare a query (e.g. statement caching / parameter binding) returning a `Query`.
70    ///
71    /// Await/Consume:
72    /// - Must be awaited; preparation may allocate resources on the driver.
73    ///
74    /// Errors:
75    /// - Driver-specific preparation failures.
76    fn prepare(
77        &mut self,
78        query: String,
79    ) -> impl Future<Output = Result<Query<Self::Driver>>> + Send;
80
81    /// Run an already prepared query, streaming heterogeneous `QueryResult` items.
82    ///
83    /// Await/Consume:
84    /// - You must drive the returned stream to completion (or until you intentionally stop).
85    ///
86    /// Stream Items:
87    /// - `QueryResult::Row` for each produced row.
88    /// - `QueryResult::Affected` for write operations (may appear before/after rows depending on driver).
89    ///
90    /// Errors:
91    /// - Emitted inline in the stream; consumers should use `TryStreamExt`.
92    fn run<'s>(
93        &'s mut self,
94        query: impl AsQuery<Self::Driver> + 's,
95    ) -> impl Stream<Item = Result<QueryResult>> + Send;
96
97    /// Run a query and stream only labeled rows, filtering out non-row results.
98    ///
99    /// Await/Consume:
100    /// - Consume the stream fully if you expect to release underlying resources cleanly.
101    ///
102    /// Each error from `run` is forwarded; affected-count results are discarded.
103    fn fetch<'s>(
104        &'s mut self,
105        query: impl AsQuery<Self::Driver> + 's,
106    ) -> impl Stream<Item = Result<RowLabeled>> + Send + 's {
107        self.run(query).filter_map(|v| async move {
108            match v {
109                Ok(QueryResult::Row(v)) => Some(Ok(v)),
110                Err(e) => Some(Err(e)),
111                _ => None,
112            }
113        })
114    }
115
116    /// Execute a query and return a single aggregated `RowsAffected`.
117    ///
118    /// Await/Consume:
119    /// - Must be awaited; no side-effects are guaranteed until completion.
120    ///
121    /// If a driver returns multiple `QueryResult::Affected` values, they are combined via `FromIterator`
122    /// (driver/module must provide the appropriate implementation).
123    fn execute<'s>(
124        &'s mut self,
125        query: impl AsQuery<Self::Driver> + 's,
126    ) -> impl Future<Output = Result<RowsAffected>> + Send {
127        self.run(query)
128            .filter_map(|v| async move {
129                match v {
130                    Ok(QueryResult::Affected(v)) => Some(Ok(v)),
131                    Err(e) => Some(Err(e)),
132                    _ => None,
133                }
134            })
135            .try_collect()
136    }
137
138    /// Append entities to a table.
139    ///
140    /// Await/Consume:
141    /// - Must be awaited; insertion may be deferred until polled.
142    ///
143    /// Semantics:
144    /// - Uses driver append/ingest feature when supported.
145    /// - Falls back to plain INSERT statements via `sql_writer().write_insert(..., false)` otherwise.
146    ///
147    /// Returns:
148    /// - Total number of inserted rows.
149    fn append<'a, E, It>(
150        &mut self,
151        entities: It,
152    ) -> impl Future<Output = Result<RowsAffected>> + Send
153    where
154        E: Entity + 'a,
155        It: IntoIterator<Item = &'a E> + Send,
156    {
157        let mut query = String::new();
158        self.driver()
159            .sql_writer()
160            .write_insert(&mut query, entities, false);
161        self.execute(query)
162    }
163}