Skip to main content

bsql_core/
executor.rs

1//! The `Executor` trait — the runtime contract between generated code and the pool.
2//!
3//! Code generated by `bsql::query!` calls methods on this trait. `Pool`,
4//! `PoolConnection`, and `Transaction` all implement it.
5//!
6//! The `query_raw` / `query_raw_readonly` methods use the bsql-driver's arena-based
7//! row storage. Generated code decodes columns from `Row` via typed getters.
8
9use bsql_driver_postgres::arena::{acquire_arena, release_arena};
10use bsql_driver_postgres::codec::Encode;
11use bsql_driver_postgres::{Arena, QueryResult};
12
13use crate::error::{BsqlError, BsqlResult};
14use crate::pool::{Pool, PoolConnection};
15use crate::transaction::Transaction;
16
17/// Owned query result that carries its arena alongside the result metadata.
18///
19/// Generated code calls `.row(i)` to access individual rows. This struct
20/// bundles the arena with the result so callsites don't manage arenas manually.
21pub struct OwnedResult {
22    pub result: QueryResult,
23    arena: Arena,
24}
25
26impl OwnedResult {
27    /// Create from a result and its arena.
28    pub(crate) fn new(result: QueryResult, arena: Arena) -> Self {
29        Self { result, arena }
30    }
31
32    /// Number of rows.
33    pub fn len(&self) -> usize {
34        self.result.len()
35    }
36
37    /// Whether the result set is empty.
38    pub fn is_empty(&self) -> bool {
39        self.result.is_empty()
40    }
41
42    /// Get a row by index.
43    pub fn row(&self, idx: usize) -> bsql_driver_postgres::Row<'_> {
44        self.result.row(idx, &self.arena)
45    }
46
47    /// Iterate over rows.
48    pub fn iter(&self) -> impl Iterator<Item = bsql_driver_postgres::Row<'_>> {
49        self.result.rows(&self.arena)
50    }
51}
52
53impl Drop for OwnedResult {
54    fn drop(&mut self) {
55        // Swap out the arena to return it to the thread-local pool.
56        // Arena implements Default, so take() avoids the explicit Arena::new().
57        let arena = std::mem::take(&mut self.arena);
58        release_arena(arena);
59    }
60}
61
62/// Execute a prepared query and return rows.
63///
64/// This trait is sealed — it cannot be implemented outside of bsql-core.
65/// The generated code calls `query_raw`, `query_raw_readonly`, and
66/// `execute_raw` on `&Pool`, `&PoolConnection`, or `&Transaction`.
67pub trait Executor: sealed::Sealed {
68    /// Execute a query and return all rows.
69    fn query_raw(
70        &self,
71        sql: &str,
72        sql_hash: u64,
73        params: &[&(dyn Encode + Sync)],
74    ) -> impl std::future::Future<Output = BsqlResult<OwnedResult>> + Send;
75
76    /// Execute a read-only query. May route to replicas in the future.
77    fn query_raw_readonly(
78        &self,
79        sql: &str,
80        sql_hash: u64,
81        params: &[&(dyn Encode + Sync)],
82    ) -> impl std::future::Future<Output = BsqlResult<OwnedResult>> + Send;
83
84    /// Execute a query and return the number of affected rows.
85    fn execute_raw(
86        &self,
87        sql: &str,
88        sql_hash: u64,
89        params: &[&(dyn Encode + Sync)],
90    ) -> impl std::future::Future<Output = BsqlResult<u64>> + Send;
91}
92
93mod sealed {
94    pub trait Sealed {}
95    impl Sealed for super::Pool {}
96    impl Sealed for super::PoolConnection {}
97    impl Sealed for super::Transaction {}
98}
99
100impl Executor for Pool {
101    async fn query_raw(
102        &self,
103        sql: &str,
104        sql_hash: u64,
105        params: &[&(dyn Encode + Sync)],
106    ) -> BsqlResult<OwnedResult> {
107        let mut guard = self.inner.acquire().await.map_err(BsqlError::from)?;
108        let mut arena = acquire_arena();
109        let result = guard
110            .query(sql, sql_hash, params, &mut arena)
111            .await
112            .map_err(BsqlError::from_driver_query)?;
113        Ok(OwnedResult::new(result, arena))
114    }
115
116    async fn query_raw_readonly(
117        &self,
118        sql: &str,
119        sql_hash: u64,
120        params: &[&(dyn Encode + Sync)],
121    ) -> BsqlResult<OwnedResult> {
122        // Route to replica pool when configured; fall back to primary otherwise.
123        let pool = self.read_pool.as_ref().unwrap_or(&self.inner);
124        let mut guard = pool.acquire().await.map_err(BsqlError::from)?;
125        let mut arena = acquire_arena();
126        let result = guard
127            .query(sql, sql_hash, params, &mut arena)
128            .await
129            .map_err(BsqlError::from_driver_query)?;
130        Ok(OwnedResult::new(result, arena))
131    }
132
133    async fn execute_raw(
134        &self,
135        sql: &str,
136        sql_hash: u64,
137        params: &[&(dyn Encode + Sync)],
138    ) -> BsqlResult<u64> {
139        let mut guard = self.inner.acquire().await.map_err(BsqlError::from)?;
140        guard
141            .execute(sql, sql_hash, params)
142            .await
143            .map_err(BsqlError::from_driver_query)
144    }
145}
146
147impl Executor for PoolConnection {
148    async fn query_raw(
149        &self,
150        sql: &str,
151        sql_hash: u64,
152        params: &[&(dyn Encode + Sync)],
153    ) -> BsqlResult<OwnedResult> {
154        let mut guard = self.inner.lock().await;
155        let mut arena = acquire_arena();
156        let result = guard
157            .query(sql, sql_hash, params, &mut arena)
158            .await
159            .map_err(BsqlError::from_driver_query)?;
160        Ok(OwnedResult::new(result, arena))
161    }
162
163    async fn query_raw_readonly(
164        &self,
165        sql: &str,
166        sql_hash: u64,
167        params: &[&(dyn Encode + Sync)],
168    ) -> BsqlResult<OwnedResult> {
169        self.query_raw(sql, sql_hash, params).await
170    }
171
172    async fn execute_raw(
173        &self,
174        sql: &str,
175        sql_hash: u64,
176        params: &[&(dyn Encode + Sync)],
177    ) -> BsqlResult<u64> {
178        let mut guard = self.inner.lock().await;
179        guard
180            .execute(sql, sql_hash, params)
181            .await
182            .map_err(BsqlError::from_driver_query)
183    }
184}
185
186impl Executor for Transaction {
187    async fn query_raw(
188        &self,
189        sql: &str,
190        sql_hash: u64,
191        params: &[&(dyn Encode + Sync)],
192    ) -> BsqlResult<OwnedResult> {
193        self.query_inner(sql, sql_hash, params).await
194    }
195
196    async fn query_raw_readonly(
197        &self,
198        sql: &str,
199        sql_hash: u64,
200        params: &[&(dyn Encode + Sync)],
201    ) -> BsqlResult<OwnedResult> {
202        self.query_raw(sql, sql_hash, params).await
203    }
204
205    async fn execute_raw(
206        &self,
207        sql: &str,
208        sql_hash: u64,
209        params: &[&(dyn Encode + Sync)],
210    ) -> BsqlResult<u64> {
211        self.execute_inner(sql, sql_hash, params).await
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218    use bsql_driver_postgres::arena::{acquire_arena, release_arena};
219    use bsql_driver_postgres::{ColumnDesc, QueryResult};
220    use std::sync::Arc;
221
222    /// Helper: build an OwnedResult with `n` rows and `num_cols` columns.
223    /// Each column offset entry is a dummy (0, 0) pair — sufficient for
224    /// testing len/is_empty/row-count without decoding real data.
225    fn make_owned_result(num_rows: usize, num_cols: usize) -> OwnedResult {
226        let arena = acquire_arena();
227        let cols: Arc<[ColumnDesc]> = (0..num_cols)
228            .map(|i| ColumnDesc {
229                name: format!("c{i}").into(),
230                type_oid: 23, // int4
231                type_size: 4,
232                table_oid: 0,
233                column_id: 0,
234            })
235            .collect::<Vec<_>>()
236            .into();
237
238        let col_offsets: Vec<(usize, i32)> = vec![(0, -1); num_rows * num_cols]; // NULL columns
239        let result = QueryResult::from_parts(col_offsets, num_cols, cols, 0);
240        OwnedResult::new(result, arena)
241    }
242
243    // --- OwnedResult::new ---
244
245    #[test]
246    fn owned_result_new_zero_rows() {
247        let owned = make_owned_result(0, 2);
248        assert_eq!(owned.len(), 0);
249        assert!(owned.is_empty());
250    }
251
252    #[test]
253    fn owned_result_new_single_row() {
254        let owned = make_owned_result(1, 3);
255        assert_eq!(owned.len(), 1);
256        assert!(!owned.is_empty());
257    }
258
259    #[test]
260    fn owned_result_new_multiple_rows() {
261        let owned = make_owned_result(5, 2);
262        assert_eq!(owned.len(), 5);
263        assert!(!owned.is_empty());
264    }
265
266    // --- OwnedResult::row ---
267
268    #[test]
269    fn owned_result_row_access() {
270        let owned = make_owned_result(3, 2);
271        // Should not panic for valid indices
272        let _r0 = owned.row(0);
273        let _r1 = owned.row(1);
274        let _r2 = owned.row(2);
275    }
276
277    #[test]
278    #[should_panic]
279    fn owned_result_row_out_of_bounds_panics() {
280        let owned = make_owned_result(2, 1);
281        let _r = owned.row(2); // out of bounds
282    }
283
284    // --- OwnedResult::iter ---
285
286    #[test]
287    fn owned_result_iter_count() {
288        let owned = make_owned_result(4, 2);
289        let count = owned.iter().count();
290        assert_eq!(count, 4);
291    }
292
293    #[test]
294    fn owned_result_iter_empty() {
295        let owned = make_owned_result(0, 2);
296        let count = owned.iter().count();
297        assert_eq!(count, 0);
298    }
299
300    // --- OwnedResult::Drop releases arena back to pool ---
301
302    #[test]
303    fn owned_result_drop_releases_arena() {
304        // Acquire an arena, wrap it in OwnedResult, drop it.
305        // After drop, acquiring should succeed (arena was returned to pool).
306        let owned = make_owned_result(1, 1);
307        drop(owned);
308        // If the arena was released, we can acquire again without issue.
309        let arena = acquire_arena();
310        release_arena(arena);
311    }
312
313    // --- OwnedResult with zero columns ---
314
315    #[test]
316    fn owned_result_zero_columns() {
317        // Commands like INSERT without RETURNING have 0 columns
318        let arena = acquire_arena();
319        let cols: Arc<[ColumnDesc]> = Arc::from(Vec::new());
320        let result = QueryResult::from_parts(vec![], 0, cols, 42);
321        let owned = OwnedResult::new(result, arena);
322        assert_eq!(owned.len(), 0);
323        assert!(owned.is_empty());
324        assert_eq!(owned.result.affected_rows(), 42);
325    }
326}