Skip to main content

bsql_core/
executor.rs

1//! The `Executor` trait — the runtime contract between generated code and the pool.
2//!
3//! Code generated by `bsql::query!` calls methods on this trait. `Pool`,
4//! `PoolConnection`, and `Transaction` all implement it.
5//!
6//! The `query_raw` / `query_raw_readonly` methods use the bsql-driver's arena-based
7//! row storage. Generated code decodes columns from `Row` via typed getters.
8
9use bsql_driver_postgres::arena::release_arena;
10use bsql_driver_postgres::codec::Encode;
11use bsql_driver_postgres::{Arena, QueryResult};
12
13use crate::error::{BsqlError, BsqlResult};
14use crate::pool::{Pool, PoolConnection};
15use crate::transaction::Transaction;
16
17/// Owned query result that carries its arena alongside the result metadata.
18///
19/// Generated code calls `.row(i)` to access individual rows. This struct
20/// bundles the arena with the result so callsites don't manage arenas manually.
21pub struct OwnedResult {
22    pub result: QueryResult,
23    arena: Arena,
24}
25
26impl std::fmt::Debug for OwnedResult {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        f.debug_struct("OwnedResult")
29            .field("rows", &self.result.len())
30            .finish()
31    }
32}
33
34impl OwnedResult {
35    /// Create without arena — for queries that use data_buf instead of arena.
36    /// Zero allocation: Arena::empty() allocates nothing.
37    pub(crate) fn without_arena(result: QueryResult) -> Self {
38        Self {
39            result,
40            arena: Arena::empty(),
41        }
42    }
43
44    /// Number of rows.
45    pub fn len(&self) -> usize {
46        self.result.len()
47    }
48
49    /// Whether the result set is empty.
50    pub fn is_empty(&self) -> bool {
51        self.result.is_empty()
52    }
53
54    /// Get a row by index.
55    pub fn row(&self, idx: usize) -> bsql_driver_postgres::Row<'_> {
56        self.result.row(idx, &self.arena)
57    }
58
59    /// Iterate over rows.
60    pub fn iter(&self) -> impl Iterator<Item = bsql_driver_postgres::Row<'_>> {
61        self.result.rows(&self.arena)
62    }
63}
64
65impl Drop for OwnedResult {
66    fn drop(&mut self) {
67        // Return arena to thread-local pool.
68        let arena = std::mem::take(&mut self.arena);
69        release_arena(arena);
70        // Return data buffer to thread-local pool for reuse by next query.
71        if let Some(buf) = self.result.take_data_buf() {
72            bsql_driver_postgres::release_resp_buf(buf);
73        }
74    }
75}
76
77/// Execute a prepared query and return rows.
78///
79/// The generated code calls `query_raw`, `query_raw_readonly`, and
80/// `execute_raw` on `&Pool`, `&PoolConnection`, or `&Transaction`.
81///
82/// When the `async` feature is enabled and the pool connects via TCP,
83/// `acquire_async()` returns true async connections that use tokio I/O
84/// instead of blocking the worker thread. UDS connections remain sync
85/// (sub-millisecond, acceptable for tokio).
86pub trait Executor {
87    /// Execute a query and return all rows.
88    fn query_raw(
89        &self,
90        sql: &str,
91        sql_hash: u64,
92        params: &[&(dyn Encode + Sync)],
93    ) -> BsqlResult<OwnedResult>;
94
95    /// Execute a read-only query. May route to replicas in the future.
96    fn query_raw_readonly(
97        &self,
98        sql: &str,
99        sql_hash: u64,
100        params: &[&(dyn Encode + Sync)],
101    ) -> BsqlResult<OwnedResult>;
102
103    /// Execute a query and return the number of affected rows.
104    fn execute_raw(
105        &self,
106        sql: &str,
107        sql_hash: u64,
108        params: &[&(dyn Encode + Sync)],
109    ) -> BsqlResult<u64>;
110}
111
112/// When async feature is enabled, use `acquire_async()` which auto-detects
113/// UDS vs TCP: UDS gets sync Connection (fast, sub-ms), TCP gets AsyncConnection
114/// (true async I/O via tokio, doesn't block the worker thread).
115///
116/// The `query_async` / `execute_async` methods on PoolGuard dispatch to the
117/// correct backend: sync I/O for UDS connections, async I/O for TCP.
118/// Since we need `.await` inside a sync trait method, we use
119/// `tokio::task::block_in_place` which allows blocking the current worker
120/// while letting other tokio tasks make progress.
121#[cfg(feature = "async")]
122impl Executor for Pool {
123    #[inline]
124    fn query_raw(
125        &self,
126        sql: &str,
127        sql_hash: u64,
128        params: &[&(dyn Encode + Sync)],
129    ) -> BsqlResult<OwnedResult> {
130        let mut guard = tokio::task::block_in_place(|| {
131            tokio::runtime::Handle::current().block_on(self.inner.acquire_async())
132        })
133        .map_err(BsqlError::from)?;
134        let result = tokio::task::block_in_place(|| {
135            tokio::runtime::Handle::current().block_on(guard.query_async(sql, sql_hash, params))
136        })
137        .map_err(BsqlError::from_driver_query)?;
138        Ok(OwnedResult::without_arena(result))
139    }
140
141    #[inline]
142    fn query_raw_readonly(
143        &self,
144        sql: &str,
145        sql_hash: u64,
146        params: &[&(dyn Encode + Sync)],
147    ) -> BsqlResult<OwnedResult> {
148        let pool = self.read_pool.as_ref().unwrap_or(&self.inner);
149        let mut guard = tokio::task::block_in_place(|| {
150            tokio::runtime::Handle::current().block_on(pool.acquire_async())
151        })
152        .map_err(BsqlError::from)?;
153        let result = tokio::task::block_in_place(|| {
154            tokio::runtime::Handle::current().block_on(guard.query_async(sql, sql_hash, params))
155        })
156        .map_err(BsqlError::from_driver_query)?;
157        Ok(OwnedResult::without_arena(result))
158    }
159
160    #[inline]
161    fn execute_raw(
162        &self,
163        sql: &str,
164        sql_hash: u64,
165        params: &[&(dyn Encode + Sync)],
166    ) -> BsqlResult<u64> {
167        let mut guard = tokio::task::block_in_place(|| {
168            tokio::runtime::Handle::current().block_on(self.inner.acquire_async())
169        })
170        .map_err(BsqlError::from)?;
171        tokio::task::block_in_place(|| {
172            tokio::runtime::Handle::current().block_on(guard.execute_async(sql, sql_hash, params))
173        })
174        .map_err(BsqlError::from_driver_query)
175    }
176}
177
178/// When async feature is NOT enabled, use plain sync `acquire()` + `query()`.
179#[cfg(not(feature = "async"))]
180impl Executor for Pool {
181    #[inline]
182    fn query_raw(
183        &self,
184        sql: &str,
185        sql_hash: u64,
186        params: &[&(dyn Encode + Sync)],
187    ) -> BsqlResult<OwnedResult> {
188        let mut guard = self.inner.acquire().map_err(BsqlError::from)?;
189        let result = guard
190            .query(sql, sql_hash, params)
191            .map_err(BsqlError::from_driver_query)?;
192        Ok(OwnedResult::without_arena(result))
193    }
194
195    #[inline]
196    fn query_raw_readonly(
197        &self,
198        sql: &str,
199        sql_hash: u64,
200        params: &[&(dyn Encode + Sync)],
201    ) -> BsqlResult<OwnedResult> {
202        let pool = self.read_pool.as_ref().unwrap_or(&self.inner);
203        let mut guard = pool.acquire().map_err(BsqlError::from)?;
204        let result = guard
205            .query(sql, sql_hash, params)
206            .map_err(BsqlError::from_driver_query)?;
207        Ok(OwnedResult::without_arena(result))
208    }
209
210    #[inline]
211    fn execute_raw(
212        &self,
213        sql: &str,
214        sql_hash: u64,
215        params: &[&(dyn Encode + Sync)],
216    ) -> BsqlResult<u64> {
217        let mut guard = self.inner.acquire().map_err(BsqlError::from)?;
218        guard
219            .execute(sql, sql_hash, params)
220            .map_err(BsqlError::from_driver_query)
221    }
222}
223
224impl Executor for PoolConnection {
225    #[inline]
226    fn query_raw(
227        &self,
228        sql: &str,
229        sql_hash: u64,
230        params: &[&(dyn Encode + Sync)],
231    ) -> BsqlResult<OwnedResult> {
232        let mut guard = self.inner.lock().unwrap_or_else(|e| e.into_inner());
233        let result = guard
234            .query(sql, sql_hash, params)
235            .map_err(BsqlError::from_driver_query)?;
236        Ok(OwnedResult::without_arena(result))
237    }
238
239    #[inline]
240    fn query_raw_readonly(
241        &self,
242        sql: &str,
243        sql_hash: u64,
244        params: &[&(dyn Encode + Sync)],
245    ) -> BsqlResult<OwnedResult> {
246        self.query_raw(sql, sql_hash, params)
247    }
248
249    #[inline]
250    fn execute_raw(
251        &self,
252        sql: &str,
253        sql_hash: u64,
254        params: &[&(dyn Encode + Sync)],
255    ) -> BsqlResult<u64> {
256        let mut guard = self.inner.lock().unwrap_or_else(|e| e.into_inner());
257        guard
258            .execute(sql, sql_hash, params)
259            .map_err(BsqlError::from_driver_query)
260    }
261}
262
263impl Executor for Transaction {
264    fn query_raw(
265        &self,
266        sql: &str,
267        sql_hash: u64,
268        params: &[&(dyn Encode + Sync)],
269    ) -> BsqlResult<OwnedResult> {
270        self.query_inner(sql, sql_hash, params)
271    }
272
273    #[inline]
274    fn query_raw_readonly(
275        &self,
276        sql: &str,
277        sql_hash: u64,
278        params: &[&(dyn Encode + Sync)],
279    ) -> BsqlResult<OwnedResult> {
280        self.query_raw(sql, sql_hash, params)
281    }
282
283    #[inline]
284    fn execute_raw(
285        &self,
286        sql: &str,
287        sql_hash: u64,
288        params: &[&(dyn Encode + Sync)],
289    ) -> BsqlResult<u64> {
290        self.execute_inner(sql, sql_hash, params)
291    }
292}
293
294#[cfg(test)]
295mod tests {
296    use super::*;
297    use bsql_driver_postgres::arena::{acquire_arena, release_arena};
298    use bsql_driver_postgres::{ColumnDesc, QueryResult};
299    use std::sync::Arc;
300
301    /// Helper: build an OwnedResult with `n` rows and `num_cols` columns.
302    /// Each column offset entry is a dummy (0, 0) pair — sufficient for
303    /// testing len/is_empty/row-count without decoding real data.
304    fn make_owned_result(num_rows: usize, num_cols: usize) -> OwnedResult {
305        let arena = acquire_arena();
306        let cols: Arc<[ColumnDesc]> = (0..num_cols)
307            .map(|i| ColumnDesc {
308                name: format!("c{i}").into(),
309                type_oid: 23, // int4
310                type_size: 4,
311                table_oid: 0,
312                column_id: 0,
313            })
314            .collect::<Vec<_>>()
315            .into();
316
317        let col_offsets: Vec<(usize, i32)> = vec![(0, -1); num_rows * num_cols]; // NULL columns
318        let result = QueryResult::from_parts(col_offsets, num_cols, cols, 0);
319        OwnedResult { result, arena }
320    }
321
322    // --- OwnedResult ---
323
324    #[test]
325    fn owned_result_new_zero_rows() {
326        let owned = make_owned_result(0, 2);
327        assert_eq!(owned.len(), 0);
328        assert!(owned.is_empty());
329    }
330
331    #[test]
332    fn owned_result_new_single_row() {
333        let owned = make_owned_result(1, 3);
334        assert_eq!(owned.len(), 1);
335        assert!(!owned.is_empty());
336    }
337
338    #[test]
339    fn owned_result_new_multiple_rows() {
340        let owned = make_owned_result(5, 2);
341        assert_eq!(owned.len(), 5);
342        assert!(!owned.is_empty());
343    }
344
345    // --- OwnedResult::row ---
346
347    #[test]
348    fn owned_result_row_access() {
349        let owned = make_owned_result(3, 2);
350        // Should not panic for valid indices
351        let _r0 = owned.row(0);
352        let _r1 = owned.row(1);
353        let _r2 = owned.row(2);
354    }
355
356    #[test]
357    #[should_panic]
358    fn owned_result_row_out_of_bounds_panics() {
359        let owned = make_owned_result(2, 1);
360        let _r = owned.row(2); // out of bounds
361    }
362
363    // --- OwnedResult::iter ---
364
365    #[test]
366    fn owned_result_iter_count() {
367        let owned = make_owned_result(4, 2);
368        let count = owned.iter().count();
369        assert_eq!(count, 4);
370    }
371
372    #[test]
373    fn owned_result_iter_empty() {
374        let owned = make_owned_result(0, 2);
375        let count = owned.iter().count();
376        assert_eq!(count, 0);
377    }
378
379    // --- OwnedResult::Drop releases arena back to pool ---
380
381    #[test]
382    fn owned_result_drop_releases_arena() {
383        // Acquire an arena, wrap it in OwnedResult, drop it.
384        // After drop, acquiring should succeed (arena was returned to pool).
385        let owned = make_owned_result(1, 1);
386        drop(owned);
387        // If the arena was released, we can acquire again without issue.
388        let arena = acquire_arena();
389        release_arena(arena);
390    }
391
392    // --- OwnedResult with zero columns ---
393
394    #[test]
395    fn owned_result_zero_columns() {
396        // Commands like INSERT without RETURNING have 0 columns
397        let arena = acquire_arena();
398        let cols: Arc<[ColumnDesc]> = Arc::from(Vec::new());
399        let result = QueryResult::from_parts(vec![], 0, cols, 42);
400        let owned = OwnedResult { result, arena };
401        assert_eq!(owned.len(), 0);
402        assert!(owned.is_empty());
403        assert_eq!(owned.result.affected_rows(), 42);
404    }
405
406    // --- OwnedResult::without_arena ---
407
408    #[test]
409    fn owned_result_without_arena_len_zero() {
410        let cols: Arc<[ColumnDesc]> = Arc::from(Vec::new());
411        let result = QueryResult::from_parts(vec![], 0, cols, 0);
412        let owned = OwnedResult::without_arena(result);
413        assert_eq!(owned.len(), 0);
414    }
415
416    #[test]
417    fn owned_result_without_arena_is_empty() {
418        let cols: Arc<[ColumnDesc]> = Arc::from(Vec::new());
419        let result = QueryResult::from_parts(vec![], 0, cols, 0);
420        let owned = OwnedResult::without_arena(result);
421        assert!(owned.is_empty());
422    }
423
424    #[test]
425    fn owned_result_without_arena_with_rows() {
426        let cols: Arc<[ColumnDesc]> = vec![ColumnDesc {
427            name: "c0".into(),
428            type_oid: 23,
429            type_size: 4,
430            table_oid: 0,
431            column_id: 0,
432        }]
433        .into();
434        let col_offsets = vec![(0, -1); 3]; // 3 rows, 1 col each (all NULL)
435        let result = QueryResult::from_parts(col_offsets, 1, cols, 0);
436        let owned = OwnedResult::without_arena(result);
437        assert_eq!(owned.len(), 3);
438        assert!(!owned.is_empty());
439    }
440
441    // --- OwnedResult Debug ---
442
443    #[test]
444    fn owned_result_debug_format() {
445        let owned = make_owned_result(5, 2);
446        let dbg = format!("{owned:?}");
447        assert!(
448            dbg.contains("OwnedResult"),
449            "Debug should contain struct name: {dbg}"
450        );
451        assert!(dbg.contains("5"), "Debug should contain row count: {dbg}");
452    }
453
454    // --- OwnedResult drop without_arena variant ---
455
456    #[test]
457    fn owned_result_without_arena_drop_does_not_panic() {
458        let cols: Arc<[ColumnDesc]> = Arc::from(Vec::new());
459        let result = QueryResult::from_parts(vec![], 0, cols, 0);
460        let owned = OwnedResult::without_arena(result);
461        drop(owned); // Must not panic — arena is Arena::empty()
462    }
463}