tank_core/executor.rs
1use crate::{
2 AsQuery, Driver, Entity, Query, QueryResult, Result, RowLabeled, RowsAffected,
3 stream::{Stream, StreamExt, TryStreamExt},
4 writer::SqlWriter,
5};
6use std::future::Future;
7
8/// Async query executor bound to a concrete `Driver`.
9///
10/// Responsibilities:
11/// - Translate high-level operations into driver queries
12/// - Stream results without buffering the entire result set (if possible)
13/// - Provide ergonomic helpers for common patterns
14///
15/// Implementors typically wrap a connection or pooled handle.
16pub trait Executor: Send + Sized {
17 /// Underlying driver type supplying SQL dialect + I/O.
18 type Driver: Driver;
19
20 /// Access the driver instance.
21 fn driver(&self) -> &Self::Driver;
22
23 /// Prepare a query (e.g. statement caching / parameter binding) returning a `Query`.
24 ///
25 /// Await/Consume:
26 /// - Must be awaited; preparation may allocate resources on the driver.
27 ///
28 /// Errors:
29 /// - Driver-specific preparation failures.
30 fn prepare(
31 &mut self,
32 query: String,
33 ) -> impl Future<Output = Result<Query<Self::Driver>>> + Send;
34
35 /// Run an already prepared query, streaming heterogeneous `QueryResult` items.
36 ///
37 /// Await/Consume:
38 /// - You must drive the returned stream to completion (or until you intentionally stop).
39 ///
40 /// Stream Items:
41 /// - `QueryResult::Row` for each produced row.
42 /// - `QueryResult::Affected` for write operations (may appear before/after rows depending on driver).
43 ///
44 /// Errors:
45 /// - Emitted inline in the stream; consumers should use `TryStreamExt`.
46 fn run<'s>(
47 &'s mut self,
48 query: impl AsQuery<Self::Driver> + 's,
49 ) -> impl Stream<Item = Result<QueryResult>> + Send;
50
51 /// Run a query and stream only labeled rows, filtering out non-row results.
52 ///
53 /// Await/Consume:
54 /// - Consume the stream fully if you expect to release underlying resources cleanly.
55 ///
56 /// Each error from `run` is forwarded; affected-count results are discarded.
57 fn fetch<'s>(
58 &'s mut self,
59 query: impl AsQuery<Self::Driver> + 's,
60 ) -> impl Stream<Item = Result<RowLabeled>> + Send + 's {
61 self.run(query).filter_map(|v| async move {
62 match v {
63 Ok(QueryResult::Row(v)) => Some(Ok(v)),
64 Err(e) => Some(Err(e)),
65 _ => None,
66 }
67 })
68 }
69
70 /// Execute a query and return a single aggregated `RowsAffected`.
71 ///
72 /// Await/Consume:
73 /// - Must be awaited; no side-effects are guaranteed until completion.
74 ///
75 /// If a driver returns multiple `QueryResult::Affected` values, they are combined via `FromIterator`
76 /// (driver/module must provide the appropriate implementation).
77 fn execute<'s>(
78 &'s mut self,
79 query: impl AsQuery<Self::Driver> + 's,
80 ) -> impl Future<Output = Result<RowsAffected>> + Send {
81 self.run(query)
82 .filter_map(|v| async move {
83 match v {
84 Ok(QueryResult::Affected(v)) => Some(Ok(v)),
85 Err(e) => Some(Err(e)),
86 _ => None,
87 }
88 })
89 .try_collect()
90 }
91
92 /// Append entities to a table.
93 ///
94 /// Await/Consume:
95 /// - Must be awaited; insertion may be deferred until polled.
96 ///
97 /// Semantics:
98 /// - Uses driver append/ingest feature when supported.
99 /// - Falls back to plain INSERT statements via `sql_writer().write_insert(..., false)` otherwise.
100 ///
101 /// Returns:
102 /// - Total number of inserted rows.
103 fn append<'a, E, It>(
104 &mut self,
105 entities: It,
106 ) -> impl Future<Output = Result<RowsAffected>> + Send
107 where
108 E: Entity + 'a,
109 It: IntoIterator<Item = &'a E> + Send,
110 {
111 let mut query = String::new();
112 self.driver()
113 .sql_writer()
114 .write_insert(&mut query, entities, false);
115 self.execute(query)
116 }
117}