tank_core/
executor.rs

1//! Execution abstraction wrapping a database `Driver`.
2//! NOTE: All returned Futures and Streams MUST be awaited / fully consumed.
3//! Some drivers may side-effect early, but for portability and correctness
4//! always await the future or exhaust the stream.
5//!
6//! The `Executor` trait provides a uniform, async/stream-based interface for:
7//! - Preparing parameterized queries (`prepare`)
8//! - Running arbitrary queries yielding heterogeneous results (`run`)
9//! - Convenience adapters to obtain only rows (`fetch`) or only affected counts (`execute`)
10//! - Bulk data ingestion (`append`), with graceful fallback for drivers lacking append semantics.
11//!
12//! Streams:
13//! `run` yields `QueryResult` items. Higher-level helpers (`fetch`, `execute`) filter & map only the
14//! variants they care about, propagating errors while discarding unrelated items.
15//!
16//! Lifetimes:
17//! `fetch` ties the stream lifetime `'s` to `&'s mut self`, ensuring the executor outlives row decoding.
18//!
19//! Fallbacks:
20//! `append` always emits an INSERT when the driver has no dedicated append/merge support.
21//!
22//! Awaiting:
23//! Every method returning a Future or Stream must be awaited / polled to completion. Some drivers
24//! might perform effects before awaiting, but relying on that is undefined behavior across drivers.
25#![doc = r#"
26The `Executor` trait provides a uniform, async/stream-based interface for:
27- Preparing parameterized queries (`prepare`)
28- Running arbitrary queries yielding heterogeneous results (`run`)
29- Convenience adapters to obtain only rows (`fetch`) or only affected counts (`execute`)
30- Bulk data ingestion (`append`), with graceful fallback for drivers lacking append semantics.
31
32Streams:
33`run` yields `QueryResult` items. Higher-level helpers (`fetch`, `execute`) filter & map only the
34variants they care about, propagating errors while discarding unrelated items.
35
36Lifetimes:
37`fetch` ties the stream lifetime `'s` to `&'s mut self`, ensuring the executor outlives row decoding.
38
39Fallbacks:
40`append` always emits an INSERT when the driver has no dedicated append/merge support.
41
42Awaiting:
43Every method returning a Future or Stream must be awaited / polled to completion. Some drivers
44might perform effects before awaiting, but relying on that is undefined behavior across drivers.
45"#]
46
47use crate::{
48    Driver, Entity, Query, QueryResult, Result, RowLabeled, RowsAffected,
49    stream::{Stream, StreamExt, TryStreamExt},
50    writer::SqlWriter,
51};
52use std::future::Future;
53
54/// Async query executor bound to a concrete `Driver`.
55///
56/// Responsibilities:
57/// - Translate high-level operations into driver queries
58/// - Stream results without buffering the entire result set (if possible)
59/// - Provide ergonomic helpers for common patterns
60///
61/// Implementors typically wrap a connection or pooled handle.
62pub trait Executor: Send + Sized {
63    /// Underlying driver type supplying SQL dialect + I/O.
64    type Driver: Driver;
65
66    /// Access the driver instance.
67    fn driver(&self) -> &Self::Driver;
68
69    /// Prepare a query (e.g. statement caching / parameter binding) returning a `Query`.
70    ///
71    /// Await/Consume:
72    /// - Must be awaited; preparation may allocate resources on the driver.
73    ///
74    /// Errors:
75    /// - Driver-specific preparation failures.
76    fn prepare(
77        &mut self,
78        query: String,
79    ) -> impl Future<Output = Result<Query<Self::Driver>>> + Send;
80
81    /// Run an already prepared query, streaming heterogeneous `QueryResult` items.
82    ///
83    /// Await/Consume:
84    /// - You must drive the returned stream to completion (or until you intentionally stop).
85    ///
86    /// Stream Items:
87    /// - `QueryResult::Row` for each produced row.
88    /// - `QueryResult::Affected` for write operations (may appear before/after rows depending on driver).
89    ///
90    /// Errors:
91    /// - Emitted inline in the stream; consumers should use `TryStreamExt`.
92    fn run(&mut self, query: Query<Self::Driver>)
93    -> impl Stream<Item = Result<QueryResult>> + Send;
94
95    /// Run a query and stream only labeled rows, filtering out non-row results.
96    ///
97    /// Await/Consume:
98    /// - Consume the stream fully if you expect to release underlying resources cleanly.
99    ///
100    /// Each error from `run` is forwarded; affected-count results are discarded.
101    fn fetch<'s>(
102        &'s mut self,
103        query: Query<Self::Driver>,
104    ) -> impl Stream<Item = Result<RowLabeled>> + Send + 's {
105        self.run(query).filter_map(|v| async move {
106            match v {
107                Ok(QueryResult::Row(v)) => Some(Ok(v)),
108                Err(e) => Some(Err(e)),
109                _ => None,
110            }
111        })
112    }
113
114    /// Execute a query and return a single aggregated `RowsAffected`.
115    ///
116    /// Await/Consume:
117    /// - Must be awaited; no side-effects are guaranteed until completion.
118    ///
119    /// If a driver returns multiple `QueryResult::Affected` values, they are combined via `FromIterator`
120    /// (driver/module must provide the appropriate implementation).
121    fn execute(
122        &mut self,
123        query: Query<Self::Driver>,
124    ) -> impl Future<Output = Result<RowsAffected>> + Send {
125        self.run(query)
126            .filter_map(|v| async move {
127                match v {
128                    Ok(QueryResult::Affected(v)) => Some(Ok(v)),
129                    Err(e) => Some(Err(e)),
130                    _ => None,
131                }
132            })
133            .try_collect()
134    }
135
136    /// Append entities to a table.
137    ///
138    /// Await/Consume:
139    /// - Must be awaited; insertion may be deferred until polled.
140    ///
141    /// Semantics:
142    /// - Uses driver append/ingest feature when supported.
143    /// - Falls back to plain INSERT statements via `sql_writer().write_insert(..., false)` otherwise.
144    ///
145    /// Returns:
146    /// - Total number of inserted rows.
147    fn append<'a, E, It>(
148        &mut self,
149        entities: It,
150    ) -> impl Future<Output = Result<RowsAffected>> + Send
151    where
152        E: Entity + 'a,
153        It: IntoIterator<Item = &'a E> + Send,
154    {
155        let mut query = String::new();
156        self.driver()
157            .sql_writer()
158            .write_insert(&mut query, entities, false);
159        self.execute(query.into())
160    }
161}