Skip to main content

bsql_core/
executor.rs

1//! Query dispatch via [`QueryTarget`] — the runtime contract between generated
2//! code and the pool/connection/transaction.
3//!
4//! Code generated by `bsql::query!` converts the user-supplied executor into a
5//! [`QueryTarget`] via `Into`, then calls `query_raw` / `query_raw_readonly` /
6//! `execute_raw` on it. This enum dispatch replaces the old `Executor` trait
7//! and eliminates `Mutex` from `PoolConnection` and `Transaction`.
8
9use bsql_driver_postgres::arena::release_arena;
10use bsql_driver_postgres::codec::Encode;
11use bsql_driver_postgres::{Arena, QueryResult};
12
13use crate::error::{BsqlError, BsqlResult};
14use crate::pool::{Pool, PoolConnection};
15use crate::transaction::Transaction;
16
17/// Owned query result that carries its arena alongside the result metadata.
18///
19/// Generated code calls `.row(i)` to access individual rows. This struct
20/// bundles the arena with the result so callsites don't manage arenas manually.
21pub struct OwnedResult {
22    pub result: QueryResult,
23    arena: Arena,
24}
25
26impl std::fmt::Debug for OwnedResult {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        f.debug_struct("OwnedResult")
29            .field("rows", &self.result.len())
30            .finish()
31    }
32}
33
34impl OwnedResult {
35    /// Create without arena — for queries that use data_buf instead of arena.
36    /// Zero allocation: Arena::empty() allocates nothing.
37    pub(crate) fn without_arena(result: QueryResult) -> Self {
38        Self {
39            result,
40            arena: Arena::empty(),
41        }
42    }
43
44    /// Number of rows.
45    #[inline]
46    pub fn len(&self) -> usize {
47        self.result.len()
48    }
49
50    /// Whether the result set is empty.
51    #[inline]
52    pub fn is_empty(&self) -> bool {
53        self.result.is_empty()
54    }
55
56    /// Get a row by index.
57    #[inline]
58    pub fn row(&self, idx: usize) -> bsql_driver_postgres::Row<'_> {
59        self.result.row(idx, &self.arena)
60    }
61
62    /// Iterate over rows.
63    #[inline]
64    pub fn iter(&self) -> impl Iterator<Item = bsql_driver_postgres::Row<'_>> {
65        self.result.rows(&self.arena)
66    }
67}
68
69impl Drop for OwnedResult {
70    fn drop(&mut self) {
71        // Return arena to thread-local pool.
72        let arena = std::mem::take(&mut self.arena);
73        release_arena(arena);
74        // Return data buffer to thread-local pool for reuse by next query.
75        if let Some(buf) = self.result.take_data_buf() {
76            bsql_driver_postgres::release_resp_buf(buf);
77        }
78        // Return column offsets buffer to thread-local pool.
79        let col_offsets = self.result.take_col_offsets();
80        if col_offsets.capacity() > 0 {
81            bsql_driver_postgres::release_col_offsets(col_offsets);
82        }
83    }
84}
85
86// ---------------------------------------------------------------------------
87// QueryTarget — enum dispatch for Pool / PoolConnection / Transaction
88// ---------------------------------------------------------------------------
89
90/// Concrete enum for query dispatch. Replaces the `Executor` trait.
91///
92/// Generated code converts the user-supplied executor (`&Pool`,
93/// `&mut PoolConnection`, or `&mut Transaction`) into `QueryTarget` via
94/// `Into`, then calls `query_raw` / `query_raw_readonly` / `execute_raw`.
95///
96/// `Pool` takes a shared reference (`&Pool`) because it acquires from the
97/// pool internally. `PoolConnection` and `Transaction` take exclusive
98/// references (`&mut`) — no `Mutex` needed.
99pub enum QueryTarget<'a> {
100    Pool(&'a Pool),
101    Conn(&'a mut PoolConnection),
102    Tx(&'a mut Transaction),
103}
104
105impl<'a> From<&'a Pool> for QueryTarget<'a> {
106    #[inline]
107    fn from(pool: &'a Pool) -> Self {
108        QueryTarget::Pool(pool)
109    }
110}
111
112impl<'a> From<&'a mut PoolConnection> for QueryTarget<'a> {
113    #[inline]
114    fn from(conn: &'a mut PoolConnection) -> Self {
115        QueryTarget::Conn(conn)
116    }
117}
118
119impl<'a> From<&'a mut Transaction> for QueryTarget<'a> {
120    #[inline]
121    fn from(tx: &'a mut Transaction) -> Self {
122        QueryTarget::Tx(tx)
123    }
124}
125
126// --- Async QueryTarget methods ---
127
128#[cfg(feature = "async")]
129impl<'a> QueryTarget<'a> {
130    /// Execute a query and return all rows.
131    #[inline]
132    pub async fn query_raw(
133        self,
134        sql: &str,
135        sql_hash: u64,
136        params: &[&(dyn Encode + Sync)],
137    ) -> BsqlResult<OwnedResult> {
138        match self {
139            QueryTarget::Pool(pool) => {
140                let mut guard = pool.inner.acquire_async().await.map_err(BsqlError::from)?;
141                let result = guard
142                    .query_async(sql, sql_hash, params)
143                    .await
144                    .map_err(BsqlError::from_driver_query)?;
145                Ok(OwnedResult::without_arena(result))
146            }
147            QueryTarget::Conn(conn) => {
148                let result = conn
149                    .inner
150                    .query(sql, sql_hash, params)
151                    .map_err(BsqlError::from_driver_query)?;
152                Ok(OwnedResult::without_arena(result))
153            }
154            QueryTarget::Tx(tx) => tx.query_inner(sql, sql_hash, params),
155        }
156    }
157
158    /// Execute a read-only query. Routes to replicas for Pool when configured.
159    #[inline]
160    pub async fn query_raw_readonly(
161        self,
162        sql: &str,
163        sql_hash: u64,
164        params: &[&(dyn Encode + Sync)],
165    ) -> BsqlResult<OwnedResult> {
166        match self {
167            QueryTarget::Pool(pool) => {
168                let driver_pool = pool.read_pool.as_ref().unwrap_or(&pool.inner);
169                let mut guard = driver_pool.acquire_async().await.map_err(BsqlError::from)?;
170                let result = guard
171                    .query_async(sql, sql_hash, params)
172                    .await
173                    .map_err(BsqlError::from_driver_query)?;
174                Ok(OwnedResult::without_arena(result))
175            }
176            // PoolConnection and Transaction don't have replicas; same as query_raw.
177            QueryTarget::Conn(conn) => {
178                let result = conn
179                    .inner
180                    .query(sql, sql_hash, params)
181                    .map_err(BsqlError::from_driver_query)?;
182                Ok(OwnedResult::without_arena(result))
183            }
184            QueryTarget::Tx(tx) => tx.query_inner(sql, sql_hash, params),
185        }
186    }
187
188    /// Execute a query and return the number of affected rows.
189    #[inline]
190    pub async fn execute_raw(
191        self,
192        sql: &str,
193        sql_hash: u64,
194        params: &[&(dyn Encode + Sync)],
195    ) -> BsqlResult<u64> {
196        match self {
197            QueryTarget::Pool(pool) => {
198                let mut guard = pool.inner.acquire_async().await.map_err(BsqlError::from)?;
199                guard
200                    .execute_async(sql, sql_hash, params)
201                    .await
202                    .map_err(BsqlError::from_driver_query)
203            }
204            QueryTarget::Conn(conn) => conn
205                .inner
206                .execute(sql, sql_hash, params)
207                .map_err(BsqlError::from_driver_query),
208            QueryTarget::Tx(tx) => tx.execute_inner(sql, sql_hash, params),
209        }
210    }
211}
212
213// --- Sync QueryTarget methods ---
214
215#[cfg(not(feature = "async"))]
216impl<'a> QueryTarget<'a> {
217    /// Execute a query and return all rows.
218    #[inline]
219    pub fn query_raw(
220        self,
221        sql: &str,
222        sql_hash: u64,
223        params: &[&(dyn Encode + Sync)],
224    ) -> BsqlResult<OwnedResult> {
225        match self {
226            QueryTarget::Pool(pool) => {
227                let mut guard = pool.inner.acquire().map_err(BsqlError::from)?;
228                let result = guard
229                    .query(sql, sql_hash, params)
230                    .map_err(BsqlError::from_driver_query)?;
231                Ok(OwnedResult::without_arena(result))
232            }
233            QueryTarget::Conn(conn) => {
234                let result = conn
235                    .inner
236                    .query(sql, sql_hash, params)
237                    .map_err(BsqlError::from_driver_query)?;
238                Ok(OwnedResult::without_arena(result))
239            }
240            QueryTarget::Tx(tx) => tx.query_inner(sql, sql_hash, params),
241        }
242    }
243
244    /// Execute a read-only query. Routes to replicas for Pool when configured.
245    #[inline]
246    pub fn query_raw_readonly(
247        self,
248        sql: &str,
249        sql_hash: u64,
250        params: &[&(dyn Encode + Sync)],
251    ) -> BsqlResult<OwnedResult> {
252        match self {
253            QueryTarget::Pool(pool) => {
254                let driver_pool = pool.read_pool.as_ref().unwrap_or(&pool.inner);
255                let mut guard = driver_pool.acquire().map_err(BsqlError::from)?;
256                let result = guard
257                    .query(sql, sql_hash, params)
258                    .map_err(BsqlError::from_driver_query)?;
259                Ok(OwnedResult::without_arena(result))
260            }
261            QueryTarget::Conn(conn) => {
262                let result = conn
263                    .inner
264                    .query(sql, sql_hash, params)
265                    .map_err(BsqlError::from_driver_query)?;
266                Ok(OwnedResult::without_arena(result))
267            }
268            QueryTarget::Tx(tx) => tx.query_inner(sql, sql_hash, params),
269        }
270    }
271
272    /// Execute a query and return the number of affected rows.
273    #[inline]
274    pub fn execute_raw(
275        self,
276        sql: &str,
277        sql_hash: u64,
278        params: &[&(dyn Encode + Sync)],
279    ) -> BsqlResult<u64> {
280        match self {
281            QueryTarget::Pool(pool) => {
282                let mut guard = pool.inner.acquire().map_err(BsqlError::from)?;
283                guard
284                    .execute(sql, sql_hash, params)
285                    .map_err(BsqlError::from_driver_query)
286            }
287            QueryTarget::Conn(conn) => conn
288                .inner
289                .execute(sql, sql_hash, params)
290                .map_err(BsqlError::from_driver_query),
291            QueryTarget::Tx(tx) => tx.execute_inner(sql, sql_hash, params),
292        }
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299    use bsql_driver_postgres::arena::{acquire_arena, release_arena};
300    use bsql_driver_postgres::{ColumnDesc, QueryResult};
301    use std::sync::Arc;
302
303    /// Helper: build an OwnedResult with `n` rows and `num_cols` columns.
304    /// Each column offset entry is a dummy (0, 0) pair — sufficient for
305    /// testing len/is_empty/row-count without decoding real data.
306    fn make_owned_result(num_rows: usize, num_cols: usize) -> OwnedResult {
307        let arena = acquire_arena();
308        let cols: Arc<[ColumnDesc]> = (0..num_cols)
309            .map(|i| ColumnDesc {
310                name: format!("c{i}").into(),
311                type_oid: 23, // int4
312                type_size: 4,
313                table_oid: 0,
314                column_id: 0,
315            })
316            .collect::<Vec<_>>()
317            .into();
318
319        let col_offsets: Vec<(usize, i32)> = vec![(0, -1); num_rows * num_cols]; // NULL columns
320        let result = QueryResult::from_parts(col_offsets, num_cols, cols, 0);
321        OwnedResult { result, arena }
322    }
323
324    // --- OwnedResult ---
325
326    #[test]
327    fn owned_result_new_zero_rows() {
328        let owned = make_owned_result(0, 2);
329        assert_eq!(owned.len(), 0);
330        assert!(owned.is_empty());
331    }
332
333    #[test]
334    fn owned_result_new_single_row() {
335        let owned = make_owned_result(1, 3);
336        assert_eq!(owned.len(), 1);
337        assert!(!owned.is_empty());
338    }
339
340    #[test]
341    fn owned_result_new_multiple_rows() {
342        let owned = make_owned_result(5, 2);
343        assert_eq!(owned.len(), 5);
344        assert!(!owned.is_empty());
345    }
346
347    // --- OwnedResult::row ---
348
349    #[test]
350    fn owned_result_row_access() {
351        let owned = make_owned_result(3, 2);
352        // Should not panic for valid indices
353        let _r0 = owned.row(0);
354        let _r1 = owned.row(1);
355        let _r2 = owned.row(2);
356    }
357
358    #[test]
359    #[should_panic]
360    fn owned_result_row_out_of_bounds_panics() {
361        let owned = make_owned_result(2, 1);
362        let _r = owned.row(2); // out of bounds
363    }
364
365    // --- OwnedResult::iter ---
366
367    #[test]
368    fn owned_result_iter_count() {
369        let owned = make_owned_result(4, 2);
370        let count = owned.iter().count();
371        assert_eq!(count, 4);
372    }
373
374    #[test]
375    fn owned_result_iter_empty() {
376        let owned = make_owned_result(0, 2);
377        let count = owned.iter().count();
378        assert_eq!(count, 0);
379    }
380
381    // --- OwnedResult::Drop releases arena back to pool ---
382
383    #[test]
384    fn owned_result_drop_releases_arena() {
385        // Acquire an arena, wrap it in OwnedResult, drop it.
386        // After drop, acquiring should succeed (arena was returned to pool).
387        let owned = make_owned_result(1, 1);
388        drop(owned);
389        // If the arena was released, we can acquire again without issue.
390        let arena = acquire_arena();
391        release_arena(arena);
392    }
393
394    // --- OwnedResult with zero columns ---
395
396    #[test]
397    fn owned_result_zero_columns() {
398        // Commands like INSERT without RETURNING have 0 columns
399        let arena = acquire_arena();
400        let cols: Arc<[ColumnDesc]> = Arc::from(Vec::new());
401        let result = QueryResult::from_parts(vec![], 0, cols, 42);
402        let owned = OwnedResult { result, arena };
403        assert_eq!(owned.len(), 0);
404        assert!(owned.is_empty());
405        assert_eq!(owned.result.affected_rows(), 42);
406    }
407
408    // --- OwnedResult::without_arena ---
409
410    #[test]
411    fn owned_result_without_arena_len_zero() {
412        let cols: Arc<[ColumnDesc]> = Arc::from(Vec::new());
413        let result = QueryResult::from_parts(vec![], 0, cols, 0);
414        let owned = OwnedResult::without_arena(result);
415        assert_eq!(owned.len(), 0);
416    }
417
418    #[test]
419    fn owned_result_without_arena_is_empty() {
420        let cols: Arc<[ColumnDesc]> = Arc::from(Vec::new());
421        let result = QueryResult::from_parts(vec![], 0, cols, 0);
422        let owned = OwnedResult::without_arena(result);
423        assert!(owned.is_empty());
424    }
425
426    #[test]
427    fn owned_result_without_arena_with_rows() {
428        let cols: Arc<[ColumnDesc]> = vec![ColumnDesc {
429            name: "c0".into(),
430            type_oid: 23,
431            type_size: 4,
432            table_oid: 0,
433            column_id: 0,
434        }]
435        .into();
436        let col_offsets = vec![(0, -1); 3]; // 3 rows, 1 col each (all NULL)
437        let result = QueryResult::from_parts(col_offsets, 1, cols, 0);
438        let owned = OwnedResult::without_arena(result);
439        assert_eq!(owned.len(), 3);
440        assert!(!owned.is_empty());
441    }
442
443    // --- OwnedResult Debug ---
444
445    #[test]
446    fn owned_result_debug_format() {
447        let owned = make_owned_result(5, 2);
448        let dbg = format!("{owned:?}");
449        assert!(
450            dbg.contains("OwnedResult"),
451            "Debug should contain struct name: {dbg}"
452        );
453        assert!(dbg.contains("5"), "Debug should contain row count: {dbg}");
454    }
455
456    // --- OwnedResult drop without_arena variant ---
457
458    #[test]
459    fn owned_result_without_arena_drop_does_not_panic() {
460        let cols: Arc<[ColumnDesc]> = Arc::from(Vec::new());
461        let result = QueryResult::from_parts(vec![], 0, cols, 0);
462        let owned = OwnedResult::without_arena(result);
463        drop(owned); // Must not panic — arena is Arena::empty()
464    }
465
466    // --- Pool / PoolConnection / Transaction Send constraints ---
467    // PoolConnection and Transaction are Send but NOT Sync (no Mutex).
468
469    #[test]
470    fn pool_is_send_and_sync() {
471        fn _assert_send<T: Send>() {}
472        fn _assert_sync<T: Sync>() {}
473        _assert_send::<crate::pool::Pool>();
474        _assert_sync::<crate::pool::Pool>();
475    }
476
477    #[test]
478    fn pool_connection_is_send() {
479        fn _assert_send<T: Send>() {}
480        _assert_send::<crate::pool::PoolConnection>();
481    }
482
483    #[test]
484    fn transaction_is_send() {
485        fn _assert_send<T: Send>() {}
486        _assert_send::<crate::transaction::Transaction>();
487    }
488
489    #[test]
490    fn owned_result_is_send_and_sync() {
491        fn _assert_send<T: Send>() {}
492        fn _assert_sync<T: Sync>() {}
493        _assert_send::<OwnedResult>();
494        _assert_sync::<OwnedResult>();
495    }
496
497    // --- QueryTarget From impls ---
498
499    #[test]
500    fn query_target_from_pool_compiles() {
501        // Compile-time test: From<&Pool> for QueryTarget exists
502        fn _check<'a>(_: &'a Pool) -> QueryTarget<'a> {
503            unimplemented!()
504        }
505    }
506
507    #[test]
508    fn query_target_from_pool_connection_compiles() {
509        fn _check<'a>(_: &'a mut PoolConnection) -> QueryTarget<'a> {
510            unimplemented!()
511        }
512    }
513
514    #[test]
515    fn query_target_from_transaction_compiles() {
516        fn _check<'a>(_: &'a mut Transaction) -> QueryTarget<'a> {
517            unimplemented!()
518        }
519    }
520
521    // --- OwnedResult affected_rows on without_arena variant ---
522
523    #[test]
524    fn owned_result_without_arena_affected_rows() {
525        let cols: Arc<[ColumnDesc]> = Arc::from(Vec::new());
526        let result = QueryResult::from_parts(vec![], 0, cols, 42);
527        let owned = OwnedResult::without_arena(result);
528        assert_eq!(owned.result.affected_rows(), 42);
529    }
530
531    #[test]
532    fn owned_result_without_arena_affected_rows_zero() {
533        let cols: Arc<[ColumnDesc]> = Arc::from(Vec::new());
534        let result = QueryResult::from_parts(vec![], 0, cols, 0);
535        let owned = OwnedResult::without_arena(result);
536        assert_eq!(owned.result.affected_rows(), 0);
537    }
538
539    // --- OwnedResult iter yields correct rows ---
540
541    #[test]
542    fn owned_result_iter_yields_all_rows() {
543        let owned = make_owned_result(3, 1);
544        let rows: Vec<_> = owned.iter().collect();
545        assert_eq!(rows.len(), 3);
546    }
547
548    // --- OwnedResult Debug format with 0 rows ---
549
550    #[test]
551    fn owned_result_debug_format_zero_rows() {
552        let owned = make_owned_result(0, 2);
553        let dbg = format!("{owned:?}");
554        assert!(dbg.contains("OwnedResult"), "should contain name: {dbg}");
555        assert!(dbg.contains("0"), "should contain 0: {dbg}");
556    }
557
558    // --- OwnedResult row panics for empty result ---
559
560    #[test]
561    #[should_panic]
562    fn owned_result_row_panics_on_empty() {
563        let owned = make_owned_result(0, 1);
564        let _r = owned.row(0);
565    }
566
567    // --- QueryTarget variant discrimination ---
568
569    #[test]
570    fn query_target_pool_variant_matches() {
571        fn _check_pool<'a>(pool: &'a Pool) {
572            let qt: QueryTarget<'a> = pool.into();
573            assert!(matches!(qt, QueryTarget::Pool(_)));
574        }
575    }
576
577    #[test]
578    fn query_target_conn_variant_matches() {
579        fn _check_conn<'a>(conn: &'a mut PoolConnection) {
580            let qt: QueryTarget<'a> = conn.into();
581            assert!(matches!(qt, QueryTarget::Conn(_)));
582        }
583    }
584
585    #[test]
586    fn query_target_tx_variant_matches() {
587        fn _check_tx<'a>(tx: &'a mut Transaction) {
588            let qt: QueryTarget<'a> = tx.into();
589            assert!(matches!(qt, QueryTarget::Tx(_)));
590        }
591    }
592
593    // --- OwnedResult large row count ---
594
595    #[test]
596    fn owned_result_large_row_count() {
597        let owned = make_owned_result(1000, 2);
598        assert_eq!(owned.len(), 1000);
599        assert!(!owned.is_empty());
600        assert_eq!(owned.iter().count(), 1000);
601    }
602
603    // --- OwnedResult single column ---
604
605    #[test]
606    fn owned_result_single_column_row_access() {
607        let owned = make_owned_result(2, 1);
608        let _r0 = owned.row(0);
609        let _r1 = owned.row(1);
610        assert_eq!(owned.len(), 2);
611    }
612
613    // --- Multiple drops don't panic ---
614
615    #[test]
616    fn owned_result_drop_twice_via_option() {
617        let owned = make_owned_result(1, 1);
618        let mut opt = Some(owned);
619        opt.take(); // first drop
620                    // second drop on None — should not panic
621        drop(opt);
622    }
623}