tank_core/executor.rs
1//! Execution abstraction wrapping a database `Driver`.
2//! NOTE: All returned Futures and Streams MUST be awaited / fully consumed.
3//! Some drivers may side-effect early, but for portability and correctness
4//! always await the future or exhaust the stream.
5//!
6//! The `Executor` trait provides a uniform, async/stream-based interface for:
7//! - Preparing parameterized queries (`prepare`)
8//! - Running arbitrary queries yielding heterogeneous results (`run`)
9//! - Convenience adapters to obtain only rows (`fetch`) or only affected counts (`execute`)
10//! - Bulk data ingestion (`append`), with graceful fallback for drivers lacking append semantics.
11//!
12//! Streams:
13//! `run` yields `QueryResult` items. Higher-level helpers (`fetch`, `execute`) filter & map only the
14//! variants they care about, propagating errors while discarding unrelated items.
15//!
16//! Lifetimes:
17//! `fetch` ties the stream lifetime `'s` to `&'s mut self`, ensuring the executor outlives row decoding.
18//!
19//! Fallbacks:
20//! `append` always emits an INSERT when the driver has no dedicated append/merge support.
21//!
22//! Awaiting:
23//! Every method returning a Future or Stream must be awaited / polled to completion. Some drivers
24//! might perform effects before awaiting, but relying on that is undefined behavior across drivers.
25#![doc = r#"
26The `Executor` trait provides a uniform, async/stream-based interface for:
27- Preparing parameterized queries (`prepare`)
28- Running arbitrary queries yielding heterogeneous results (`run`)
29- Convenience adapters to obtain only rows (`fetch`) or only affected counts (`execute`)
30- Bulk data ingestion (`append`), with graceful fallback for drivers lacking append semantics.
31
32Streams:
33`run` yields `QueryResult` items. Higher-level helpers (`fetch`, `execute`) filter & map only the
34variants they care about, propagating errors while discarding unrelated items.
35
36Lifetimes:
37`fetch` ties the stream lifetime `'s` to `&'s mut self`, ensuring the executor outlives row decoding.
38
39Fallbacks:
40`append` always emits an INSERT when the driver has no dedicated append/merge support.
41
42Awaiting:
43Every method returning a Future or Stream must be awaited / polled to completion. Some drivers
44might perform effects before awaiting, but relying on that is undefined behavior across drivers.
45"#]
46
47use crate::{
48 Driver, Entity, Query, QueryResult, Result, RowLabeled, RowsAffected,
49 stream::{Stream, StreamExt, TryStreamExt},
50 writer::SqlWriter,
51};
52use std::future::Future;
53
54/// Async query executor bound to a concrete `Driver`.
55///
56/// Responsibilities:
57/// - Translate high-level operations into driver queries
58/// - Stream results without buffering the entire result set (if possible)
59/// - Provide ergonomic helpers for common patterns
60///
61/// Implementors typically wrap a connection or pooled handle.
62pub trait Executor: Send + Sized {
63 /// Underlying driver type supplying SQL dialect + I/O.
64 type Driver: Driver;
65
66 /// Access the driver instance.
67 fn driver(&self) -> &Self::Driver;
68
69 /// Prepare a query (e.g. statement caching / parameter binding) returning a `Query`.
70 ///
71 /// Await/Consume:
72 /// - Must be awaited; preparation may allocate resources on the driver.
73 ///
74 /// Errors:
75 /// - Driver-specific preparation failures.
76 fn prepare(
77 &mut self,
78 query: String,
79 ) -> impl Future<Output = Result<Query<Self::Driver>>> + Send;
80
81 /// Run an already prepared query, streaming heterogeneous `QueryResult` items.
82 ///
83 /// Await/Consume:
84 /// - You must drive the returned stream to completion (or until you intentionally stop).
85 ///
86 /// Stream Items:
87 /// - `QueryResult::Row` for each produced row.
88 /// - `QueryResult::Affected` for write operations (may appear before/after rows depending on driver).
89 ///
90 /// Errors:
91 /// - Emitted inline in the stream; consumers should use `TryStreamExt`.
92 fn run(&mut self, query: Query<Self::Driver>)
93 -> impl Stream<Item = Result<QueryResult>> + Send;
94
95 /// Run a query and stream only labeled rows, filtering out non-row results.
96 ///
97 /// Await/Consume:
98 /// - Consume the stream fully if you expect to release underlying resources cleanly.
99 ///
100 /// Each error from `run` is forwarded; affected-count results are discarded.
101 fn fetch<'s>(
102 &'s mut self,
103 query: Query<Self::Driver>,
104 ) -> impl Stream<Item = Result<RowLabeled>> + Send + 's {
105 self.run(query).filter_map(|v| async move {
106 match v {
107 Ok(QueryResult::Row(v)) => Some(Ok(v)),
108 Err(e) => Some(Err(e)),
109 _ => None,
110 }
111 })
112 }
113
114 /// Execute a query and return a single aggregated `RowsAffected`.
115 ///
116 /// Await/Consume:
117 /// - Must be awaited; no side-effects are guaranteed until completion.
118 ///
119 /// If a driver returns multiple `QueryResult::Affected` values, they are combined via `FromIterator`
120 /// (driver/module must provide the appropriate implementation).
121 fn execute(
122 &mut self,
123 query: Query<Self::Driver>,
124 ) -> impl Future<Output = Result<RowsAffected>> + Send {
125 self.run(query)
126 .filter_map(|v| async move {
127 match v {
128 Ok(QueryResult::Affected(v)) => Some(Ok(v)),
129 Err(e) => Some(Err(e)),
130 _ => None,
131 }
132 })
133 .try_collect()
134 }
135
136 /// Append entities to a table.
137 ///
138 /// Await/Consume:
139 /// - Must be awaited; insertion may be deferred until polled.
140 ///
141 /// Semantics:
142 /// - Uses driver append/ingest feature when supported.
143 /// - Falls back to plain INSERT statements via `sql_writer().write_insert(..., false)` otherwise.
144 ///
145 /// Returns:
146 /// - Total number of inserted rows.
147 fn append<'a, E, It>(
148 &mut self,
149 entities: It,
150 ) -> impl Future<Output = Result<RowsAffected>> + Send
151 where
152 E: Entity + 'a,
153 It: IntoIterator<Item = &'a E> + Send,
154 {
155 let mut query = String::new();
156 self.driver()
157 .sql_writer()
158 .write_insert(&mut query, entities, false);
159 self.execute(query.into())
160 }
161}