tank_core/executor.rs
1//! Execution abstraction wrapping a database `Driver`.
2//! NOTE: All returned Futures and Streams MUST be awaited / fully consumed.
3//! Some drivers may side-effect early, but for portability and correctness
4//! always await the future or exhaust the stream.
5//!
6//! The `Executor` trait provides a uniform, async/stream-based interface for:
7//! - Preparing parameterized queries (`prepare`)
8//! - Running arbitrary queries yielding heterogeneous results (`run`)
9//! - Convenience adapters to obtain only rows (`fetch`) or only affected counts (`execute`)
10//! - Bulk data ingestion (`append`), with graceful fallback for drivers lacking append semantics.
11//!
12//! Streams:
13//! `run` yields `QueryResult` items. Higher-level helpers (`fetch`, `execute`) filter & map only the
14//! variants they care about, propagating errors while discarding unrelated items.
15//!
16//! Lifetimes:
17//! `fetch` ties the stream lifetime `'s` to `&'s mut self`, ensuring the executor outlives row decoding.
18//!
19//! Fallbacks:
20//! `append` always emits an INSERT when the driver has no dedicated append/merge support.
21//!
22//! Awaiting:
23//! Every method returning a Future or Stream must be awaited / polled to completion. Some drivers
24//! might perform effects before awaiting, but relying on that is undefined behavior across drivers.
25#![doc = r#"
26The `Executor` trait provides a uniform, async/stream-based interface for:
27- Preparing parameterized queries (`prepare`)
28- Running arbitrary queries yielding heterogeneous results (`run`)
29- Convenience adapters to obtain only rows (`fetch`) or only affected counts (`execute`)
30- Bulk data ingestion (`append`), with graceful fallback for drivers lacking append semantics.
31
32Streams:
33`run` yields `QueryResult` items. Higher-level helpers (`fetch`, `execute`) filter & map only the
34variants they care about, propagating errors while discarding unrelated items.
35
36Lifetimes:
37`fetch` ties the stream lifetime `'s` to `&'s mut self`, ensuring the executor outlives row decoding.
38
39Fallbacks:
40`append` always emits an INSERT when the driver has no dedicated append/merge support.
41
42Awaiting:
43Every method returning a Future or Stream must be awaited / polled to completion. Some drivers
44might perform effects before awaiting, but relying on that is undefined behavior across drivers.
45"#]
46
47use crate::{
48 AsQuery, Driver, Entity, Query, QueryResult, Result, RowLabeled, RowsAffected,
49 stream::{Stream, StreamExt, TryStreamExt},
50 writer::SqlWriter,
51};
52use std::future::Future;
53
54/// Async query executor bound to a concrete `Driver`.
55///
56/// Responsibilities:
57/// - Translate high-level operations into driver queries
58/// - Stream results without buffering the entire result set (if possible)
59/// - Provide ergonomic helpers for common patterns
60///
61/// Implementors typically wrap a connection or pooled handle.
62pub trait Executor: Send + Sized {
63 /// Underlying driver type supplying SQL dialect + I/O.
64 type Driver: Driver;
65
66 /// Access the driver instance.
67 fn driver(&self) -> &Self::Driver;
68
69 /// Prepare a query (e.g. statement caching / parameter binding) returning a `Query`.
70 ///
71 /// Await/Consume:
72 /// - Must be awaited; preparation may allocate resources on the driver.
73 ///
74 /// Errors:
75 /// - Driver-specific preparation failures.
76 fn prepare(
77 &mut self,
78 query: String,
79 ) -> impl Future<Output = Result<Query<Self::Driver>>> + Send;
80
81 /// Run an already prepared query, streaming heterogeneous `QueryResult` items.
82 ///
83 /// Await/Consume:
84 /// - You must drive the returned stream to completion (or until you intentionally stop).
85 ///
86 /// Stream Items:
87 /// - `QueryResult::Row` for each produced row.
88 /// - `QueryResult::Affected` for write operations (may appear before/after rows depending on driver).
89 ///
90 /// Errors:
91 /// - Emitted inline in the stream; consumers should use `TryStreamExt`.
92 fn run<'s>(
93 &'s mut self,
94 query: impl AsQuery<Self::Driver> + 's,
95 ) -> impl Stream<Item = Result<QueryResult>> + Send;
96
97 /// Run a query and stream only labeled rows, filtering out non-row results.
98 ///
99 /// Await/Consume:
100 /// - Consume the stream fully if you expect to release underlying resources cleanly.
101 ///
102 /// Each error from `run` is forwarded; affected-count results are discarded.
103 fn fetch<'s>(
104 &'s mut self,
105 query: impl AsQuery<Self::Driver> + 's,
106 ) -> impl Stream<Item = Result<RowLabeled>> + Send + 's {
107 self.run(query).filter_map(|v| async move {
108 match v {
109 Ok(QueryResult::Row(v)) => Some(Ok(v)),
110 Err(e) => Some(Err(e)),
111 _ => None,
112 }
113 })
114 }
115
116 /// Execute a query and return a single aggregated `RowsAffected`.
117 ///
118 /// Await/Consume:
119 /// - Must be awaited; no side-effects are guaranteed until completion.
120 ///
121 /// If a driver returns multiple `QueryResult::Affected` values, they are combined via `FromIterator`
122 /// (driver/module must provide the appropriate implementation).
123 fn execute<'s>(
124 &'s mut self,
125 query: impl AsQuery<Self::Driver> + 's,
126 ) -> impl Future<Output = Result<RowsAffected>> + Send {
127 self.run(query)
128 .filter_map(|v| async move {
129 match v {
130 Ok(QueryResult::Affected(v)) => Some(Ok(v)),
131 Err(e) => Some(Err(e)),
132 _ => None,
133 }
134 })
135 .try_collect()
136 }
137
138 /// Append entities to a table.
139 ///
140 /// Await/Consume:
141 /// - Must be awaited; insertion may be deferred until polled.
142 ///
143 /// Semantics:
144 /// - Uses driver append/ingest feature when supported.
145 /// - Falls back to plain INSERT statements via `sql_writer().write_insert(..., false)` otherwise.
146 ///
147 /// Returns:
148 /// - Total number of inserted rows.
149 fn append<'a, E, It>(
150 &mut self,
151 entities: It,
152 ) -> impl Future<Output = Result<RowsAffected>> + Send
153 where
154 E: Entity + 'a,
155 It: IntoIterator<Item = &'a E> + Send,
156 {
157 let mut query = String::new();
158 self.driver()
159 .sql_writer()
160 .write_insert(&mut query, entities, false);
161 self.execute(query)
162 }
163}