Skip to main content

bsql_core/
executor.rs

1//! The `Executor` trait — the runtime contract between generated code and the pool.
2//!
3//! Code generated by `bsql::query!` calls methods on this trait. `Pool`,
4//! `PoolConnection`, and `Transaction` all implement it.
5//!
6//! The `query_raw` / `query_raw_readonly` methods use the bsql-driver's arena-based
7//! row storage. Generated code decodes columns from `Row` via typed getters.
8
9use bsql_driver_postgres::arena::release_arena;
10use bsql_driver_postgres::codec::Encode;
11use bsql_driver_postgres::{Arena, QueryResult};
12
13use crate::error::{BsqlError, BsqlResult};
14use crate::pool::{Pool, PoolConnection};
15use crate::transaction::Transaction;
16
17/// Owned query result that carries its arena alongside the result metadata.
18///
19/// Generated code calls `.row(i)` to access individual rows. This struct
20/// bundles the arena with the result so callsites don't manage arenas manually.
21pub struct OwnedResult {
22    pub result: QueryResult,
23    arena: Arena,
24}
25
26impl std::fmt::Debug for OwnedResult {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        f.debug_struct("OwnedResult")
29            .field("rows", &self.result.len())
30            .finish()
31    }
32}
33
34impl OwnedResult {
35    /// Create without arena — for queries that use data_buf instead of arena.
36    /// Zero allocation: Arena::empty() allocates nothing.
37    pub(crate) fn without_arena(result: QueryResult) -> Self {
38        Self {
39            result,
40            arena: Arena::empty(),
41        }
42    }
43
44    /// Number of rows.
45    pub fn len(&self) -> usize {
46        self.result.len()
47    }
48
49    /// Whether the result set is empty.
50    pub fn is_empty(&self) -> bool {
51        self.result.is_empty()
52    }
53
54    /// Get a row by index.
55    pub fn row(&self, idx: usize) -> bsql_driver_postgres::Row<'_> {
56        self.result.row(idx, &self.arena)
57    }
58
59    /// Iterate over rows.
60    pub fn iter(&self) -> impl Iterator<Item = bsql_driver_postgres::Row<'_>> {
61        self.result.rows(&self.arena)
62    }
63}
64
65impl Drop for OwnedResult {
66    fn drop(&mut self) {
67        // Return arena to thread-local pool.
68        let arena = std::mem::take(&mut self.arena);
69        release_arena(arena);
70        // Return data buffer to thread-local pool for reuse by next query.
71        if let Some(buf) = self.result.take_data_buf() {
72            bsql_driver_postgres::release_resp_buf(buf);
73        }
74        // Return column offsets buffer to thread-local pool.
75        let col_offsets = self.result.take_col_offsets();
76        if col_offsets.capacity() > 0 {
77            bsql_driver_postgres::release_col_offsets(col_offsets);
78        }
79    }
80}
81
82// ---------------------------------------------------------------------------
83// Executor trait — async (RPITIT) when feature="async", sync otherwise
84// ---------------------------------------------------------------------------
85
86/// Execute a prepared query and return rows.
87///
88/// The generated code calls `query_raw`, `query_raw_readonly`, and
89/// `execute_raw` on `&Pool`, `&PoolConnection`, or `&Transaction`.
90///
91/// When the `async` feature is enabled, methods return `impl Future + Send`
92/// via RPITIT (Return Position Impl Trait In Trait), enabling true async I/O
93/// without `block_in_place`. This requires Rust 1.75+ (MSRV is 1.85).
94///
95/// When the `async` feature is disabled, methods return results directly
96/// as regular synchronous functions. No tokio dependency.
97#[cfg(feature = "async")]
98pub trait Executor: Send + Sync {
99    /// Execute a query and return all rows.
100    fn query_raw<'a>(
101        &'a self,
102        sql: &'a str,
103        sql_hash: u64,
104        params: &'a [&'a (dyn Encode + Sync)],
105    ) -> impl std::future::Future<Output = BsqlResult<OwnedResult>> + Send + 'a;
106
107    /// Execute a read-only query. Routes to replicas when configured.
108    fn query_raw_readonly<'a>(
109        &'a self,
110        sql: &'a str,
111        sql_hash: u64,
112        params: &'a [&'a (dyn Encode + Sync)],
113    ) -> impl std::future::Future<Output = BsqlResult<OwnedResult>> + Send + 'a;
114
115    /// Execute a query and return the number of affected rows.
116    fn execute_raw<'a>(
117        &'a self,
118        sql: &'a str,
119        sql_hash: u64,
120        params: &'a [&'a (dyn Encode + Sync)],
121    ) -> impl std::future::Future<Output = BsqlResult<u64>> + Send + 'a;
122}
123
124/// Sync variant of the Executor trait. No async runtime required.
125#[cfg(not(feature = "async"))]
126pub trait Executor {
127    fn query_raw(
128        &self,
129        sql: &str,
130        sql_hash: u64,
131        params: &[&(dyn Encode + Sync)],
132    ) -> BsqlResult<OwnedResult>;
133
134    fn query_raw_readonly(
135        &self,
136        sql: &str,
137        sql_hash: u64,
138        params: &[&(dyn Encode + Sync)],
139    ) -> BsqlResult<OwnedResult>;
140
141    fn execute_raw(
142        &self,
143        sql: &str,
144        sql_hash: u64,
145        params: &[&(dyn Encode + Sync)],
146    ) -> BsqlResult<u64>;
147}
148
149// ---------------------------------------------------------------------------
150// Pool — true async when feature="async", sync otherwise
151// ---------------------------------------------------------------------------
152
153/// True async: `acquire_async().await` + `query_async().await`. No
154/// `block_in_place`, no `Handle::current().block_on()`. The tokio scheduler
155/// can run other tasks while this connection waits for PostgreSQL.
156#[cfg(feature = "async")]
157#[allow(clippy::manual_async_fn)] // Intentional: RPITIT gives us + Send bound that async fn doesn't
158impl Executor for Pool {
159    #[inline]
160    fn query_raw<'a>(
161        &'a self,
162        sql: &'a str,
163        sql_hash: u64,
164        params: &'a [&'a (dyn Encode + Sync)],
165    ) -> impl std::future::Future<Output = BsqlResult<OwnedResult>> + Send + 'a {
166        async move {
167            let mut guard = self.inner.acquire_async().await.map_err(BsqlError::from)?;
168            let result = guard
169                .query_async(sql, sql_hash, params)
170                .await
171                .map_err(BsqlError::from_driver_query)?;
172            Ok(OwnedResult::without_arena(result))
173        }
174    }
175
176    #[inline]
177    fn query_raw_readonly<'a>(
178        &'a self,
179        sql: &'a str,
180        sql_hash: u64,
181        params: &'a [&'a (dyn Encode + Sync)],
182    ) -> impl std::future::Future<Output = BsqlResult<OwnedResult>> + Send + 'a {
183        async move {
184            let pool = self.read_pool.as_ref().unwrap_or(&self.inner);
185            let mut guard = pool.acquire_async().await.map_err(BsqlError::from)?;
186            let result = guard
187                .query_async(sql, sql_hash, params)
188                .await
189                .map_err(BsqlError::from_driver_query)?;
190            Ok(OwnedResult::without_arena(result))
191        }
192    }
193
194    #[inline]
195    fn execute_raw<'a>(
196        &'a self,
197        sql: &'a str,
198        sql_hash: u64,
199        params: &'a [&'a (dyn Encode + Sync)],
200    ) -> impl std::future::Future<Output = BsqlResult<u64>> + Send + 'a {
201        async move {
202            let mut guard = self.inner.acquire_async().await.map_err(BsqlError::from)?;
203            guard
204                .execute_async(sql, sql_hash, params)
205                .await
206                .map_err(BsqlError::from_driver_query)
207        }
208    }
209}
210
211/// Sync: plain `acquire()` + `query()`. No tokio.
212#[cfg(not(feature = "async"))]
213impl Executor for Pool {
214    #[inline]
215    fn query_raw(
216        &self,
217        sql: &str,
218        sql_hash: u64,
219        params: &[&(dyn Encode + Sync)],
220    ) -> BsqlResult<OwnedResult> {
221        let mut guard = self.inner.acquire().map_err(BsqlError::from)?;
222        let result = guard
223            .query(sql, sql_hash, params)
224            .map_err(BsqlError::from_driver_query)?;
225        Ok(OwnedResult::without_arena(result))
226    }
227
228    #[inline]
229    fn query_raw_readonly(
230        &self,
231        sql: &str,
232        sql_hash: u64,
233        params: &[&(dyn Encode + Sync)],
234    ) -> BsqlResult<OwnedResult> {
235        let pool = self.read_pool.as_ref().unwrap_or(&self.inner);
236        let mut guard = pool.acquire().map_err(BsqlError::from)?;
237        let result = guard
238            .query(sql, sql_hash, params)
239            .map_err(BsqlError::from_driver_query)?;
240        Ok(OwnedResult::without_arena(result))
241    }
242
243    #[inline]
244    fn execute_raw(
245        &self,
246        sql: &str,
247        sql_hash: u64,
248        params: &[&(dyn Encode + Sync)],
249    ) -> BsqlResult<u64> {
250        let mut guard = self.inner.acquire().map_err(BsqlError::from)?;
251        guard
252            .execute(sql, sql_hash, params)
253            .map_err(BsqlError::from_driver_query)
254    }
255}
256
257// ---------------------------------------------------------------------------
258// PoolConnection — always sync internally (mutex lock is instantaneous)
259// ---------------------------------------------------------------------------
260
261#[cfg(feature = "async")]
262#[allow(clippy::manual_async_fn)]
263impl Executor for PoolConnection {
264    #[inline]
265    fn query_raw<'a>(
266        &'a self,
267        sql: &'a str,
268        sql_hash: u64,
269        params: &'a [&'a (dyn Encode + Sync)],
270    ) -> impl std::future::Future<Output = BsqlResult<OwnedResult>> + Send + 'a {
271        async move {
272            let mut guard = self.inner.lock().unwrap_or_else(|e| e.into_inner());
273            let result = guard
274                .query(sql, sql_hash, params)
275                .map_err(BsqlError::from_driver_query)?;
276            Ok(OwnedResult::without_arena(result))
277        }
278    }
279
280    #[inline]
281    fn query_raw_readonly<'a>(
282        &'a self,
283        sql: &'a str,
284        sql_hash: u64,
285        params: &'a [&'a (dyn Encode + Sync)],
286    ) -> impl std::future::Future<Output = BsqlResult<OwnedResult>> + Send + 'a {
287        self.query_raw(sql, sql_hash, params)
288    }
289
290    #[inline]
291    fn execute_raw<'a>(
292        &'a self,
293        sql: &'a str,
294        sql_hash: u64,
295        params: &'a [&'a (dyn Encode + Sync)],
296    ) -> impl std::future::Future<Output = BsqlResult<u64>> + Send + 'a {
297        async move {
298            let mut guard = self.inner.lock().unwrap_or_else(|e| e.into_inner());
299            guard
300                .execute(sql, sql_hash, params)
301                .map_err(BsqlError::from_driver_query)
302        }
303    }
304}
305
306#[cfg(not(feature = "async"))]
307impl Executor for PoolConnection {
308    #[inline]
309    fn query_raw(
310        &self,
311        sql: &str,
312        sql_hash: u64,
313        params: &[&(dyn Encode + Sync)],
314    ) -> BsqlResult<OwnedResult> {
315        let mut guard = self.inner.lock().unwrap_or_else(|e| e.into_inner());
316        let result = guard
317            .query(sql, sql_hash, params)
318            .map_err(BsqlError::from_driver_query)?;
319        Ok(OwnedResult::without_arena(result))
320    }
321
322    #[inline]
323    fn query_raw_readonly(
324        &self,
325        sql: &str,
326        sql_hash: u64,
327        params: &[&(dyn Encode + Sync)],
328    ) -> BsqlResult<OwnedResult> {
329        self.query_raw(sql, sql_hash, params)
330    }
331
332    #[inline]
333    fn execute_raw(
334        &self,
335        sql: &str,
336        sql_hash: u64,
337        params: &[&(dyn Encode + Sync)],
338    ) -> BsqlResult<u64> {
339        let mut guard = self.inner.lock().unwrap_or_else(|e| e.into_inner());
340        guard
341            .execute(sql, sql_hash, params)
342            .map_err(BsqlError::from_driver_query)
343    }
344}
345
346// ---------------------------------------------------------------------------
347// Transaction — always sync internally (mutex + driver call)
348// ---------------------------------------------------------------------------
349
350#[cfg(feature = "async")]
351#[allow(clippy::manual_async_fn)]
352impl Executor for Transaction {
353    fn query_raw<'a>(
354        &'a self,
355        sql: &'a str,
356        sql_hash: u64,
357        params: &'a [&'a (dyn Encode + Sync)],
358    ) -> impl std::future::Future<Output = BsqlResult<OwnedResult>> + Send + 'a {
359        async move { self.query_inner(sql, sql_hash, params) }
360    }
361
362    #[inline]
363    fn query_raw_readonly<'a>(
364        &'a self,
365        sql: &'a str,
366        sql_hash: u64,
367        params: &'a [&'a (dyn Encode + Sync)],
368    ) -> impl std::future::Future<Output = BsqlResult<OwnedResult>> + Send + 'a {
369        self.query_raw(sql, sql_hash, params)
370    }
371
372    #[inline]
373    fn execute_raw<'a>(
374        &'a self,
375        sql: &'a str,
376        sql_hash: u64,
377        params: &'a [&'a (dyn Encode + Sync)],
378    ) -> impl std::future::Future<Output = BsqlResult<u64>> + Send + 'a {
379        async move { self.execute_inner(sql, sql_hash, params) }
380    }
381}
382
383#[cfg(not(feature = "async"))]
384impl Executor for Transaction {
385    fn query_raw(
386        &self,
387        sql: &str,
388        sql_hash: u64,
389        params: &[&(dyn Encode + Sync)],
390    ) -> BsqlResult<OwnedResult> {
391        self.query_inner(sql, sql_hash, params)
392    }
393
394    #[inline]
395    fn query_raw_readonly(
396        &self,
397        sql: &str,
398        sql_hash: u64,
399        params: &[&(dyn Encode + Sync)],
400    ) -> BsqlResult<OwnedResult> {
401        self.query_raw(sql, sql_hash, params)
402    }
403
404    #[inline]
405    fn execute_raw(
406        &self,
407        sql: &str,
408        sql_hash: u64,
409        params: &[&(dyn Encode + Sync)],
410    ) -> BsqlResult<u64> {
411        self.execute_inner(sql, sql_hash, params)
412    }
413}
414
415#[cfg(test)]
416mod tests {
417    use super::*;
418    use bsql_driver_postgres::arena::{acquire_arena, release_arena};
419    use bsql_driver_postgres::{ColumnDesc, QueryResult};
420    use std::sync::Arc;
421
422    /// Helper: build an OwnedResult with `n` rows and `num_cols` columns.
423    /// Each column offset entry is a dummy (0, 0) pair — sufficient for
424    /// testing len/is_empty/row-count without decoding real data.
425    fn make_owned_result(num_rows: usize, num_cols: usize) -> OwnedResult {
426        let arena = acquire_arena();
427        let cols: Arc<[ColumnDesc]> = (0..num_cols)
428            .map(|i| ColumnDesc {
429                name: format!("c{i}").into(),
430                type_oid: 23, // int4
431                type_size: 4,
432                table_oid: 0,
433                column_id: 0,
434            })
435            .collect::<Vec<_>>()
436            .into();
437
438        let col_offsets: Vec<(usize, i32)> = vec![(0, -1); num_rows * num_cols]; // NULL columns
439        let result = QueryResult::from_parts(col_offsets, num_cols, cols, 0);
440        OwnedResult { result, arena }
441    }
442
443    // --- OwnedResult ---
444
445    #[test]
446    fn owned_result_new_zero_rows() {
447        let owned = make_owned_result(0, 2);
448        assert_eq!(owned.len(), 0);
449        assert!(owned.is_empty());
450    }
451
452    #[test]
453    fn owned_result_new_single_row() {
454        let owned = make_owned_result(1, 3);
455        assert_eq!(owned.len(), 1);
456        assert!(!owned.is_empty());
457    }
458
459    #[test]
460    fn owned_result_new_multiple_rows() {
461        let owned = make_owned_result(5, 2);
462        assert_eq!(owned.len(), 5);
463        assert!(!owned.is_empty());
464    }
465
466    // --- OwnedResult::row ---
467
468    #[test]
469    fn owned_result_row_access() {
470        let owned = make_owned_result(3, 2);
471        // Should not panic for valid indices
472        let _r0 = owned.row(0);
473        let _r1 = owned.row(1);
474        let _r2 = owned.row(2);
475    }
476
477    #[test]
478    #[should_panic]
479    fn owned_result_row_out_of_bounds_panics() {
480        let owned = make_owned_result(2, 1);
481        let _r = owned.row(2); // out of bounds
482    }
483
484    // --- OwnedResult::iter ---
485
486    #[test]
487    fn owned_result_iter_count() {
488        let owned = make_owned_result(4, 2);
489        let count = owned.iter().count();
490        assert_eq!(count, 4);
491    }
492
493    #[test]
494    fn owned_result_iter_empty() {
495        let owned = make_owned_result(0, 2);
496        let count = owned.iter().count();
497        assert_eq!(count, 0);
498    }
499
500    // --- OwnedResult::Drop releases arena back to pool ---
501
502    #[test]
503    fn owned_result_drop_releases_arena() {
504        // Acquire an arena, wrap it in OwnedResult, drop it.
505        // After drop, acquiring should succeed (arena was returned to pool).
506        let owned = make_owned_result(1, 1);
507        drop(owned);
508        // If the arena was released, we can acquire again without issue.
509        let arena = acquire_arena();
510        release_arena(arena);
511    }
512
513    // --- OwnedResult with zero columns ---
514
515    #[test]
516    fn owned_result_zero_columns() {
517        // Commands like INSERT without RETURNING have 0 columns
518        let arena = acquire_arena();
519        let cols: Arc<[ColumnDesc]> = Arc::from(Vec::new());
520        let result = QueryResult::from_parts(vec![], 0, cols, 42);
521        let owned = OwnedResult { result, arena };
522        assert_eq!(owned.len(), 0);
523        assert!(owned.is_empty());
524        assert_eq!(owned.result.affected_rows(), 42);
525    }
526
527    // --- OwnedResult::without_arena ---
528
529    #[test]
530    fn owned_result_without_arena_len_zero() {
531        let cols: Arc<[ColumnDesc]> = Arc::from(Vec::new());
532        let result = QueryResult::from_parts(vec![], 0, cols, 0);
533        let owned = OwnedResult::without_arena(result);
534        assert_eq!(owned.len(), 0);
535    }
536
537    #[test]
538    fn owned_result_without_arena_is_empty() {
539        let cols: Arc<[ColumnDesc]> = Arc::from(Vec::new());
540        let result = QueryResult::from_parts(vec![], 0, cols, 0);
541        let owned = OwnedResult::without_arena(result);
542        assert!(owned.is_empty());
543    }
544
545    #[test]
546    fn owned_result_without_arena_with_rows() {
547        let cols: Arc<[ColumnDesc]> = vec![ColumnDesc {
548            name: "c0".into(),
549            type_oid: 23,
550            type_size: 4,
551            table_oid: 0,
552            column_id: 0,
553        }]
554        .into();
555        let col_offsets = vec![(0, -1); 3]; // 3 rows, 1 col each (all NULL)
556        let result = QueryResult::from_parts(col_offsets, 1, cols, 0);
557        let owned = OwnedResult::without_arena(result);
558        assert_eq!(owned.len(), 3);
559        assert!(!owned.is_empty());
560    }
561
562    // --- OwnedResult Debug ---
563
564    #[test]
565    fn owned_result_debug_format() {
566        let owned = make_owned_result(5, 2);
567        let dbg = format!("{owned:?}");
568        assert!(
569            dbg.contains("OwnedResult"),
570            "Debug should contain struct name: {dbg}"
571        );
572        assert!(dbg.contains("5"), "Debug should contain row count: {dbg}");
573    }
574
575    // --- OwnedResult drop without_arena variant ---
576
577    #[test]
578    fn owned_result_without_arena_drop_does_not_panic() {
579        let cols: Arc<[ColumnDesc]> = Arc::from(Vec::new());
580        let result = QueryResult::from_parts(vec![], 0, cols, 0);
581        let owned = OwnedResult::without_arena(result);
582        drop(owned); // Must not panic — arena is Arena::empty()
583    }
584
585    // --- Pool / PoolConnection / Transaction Send+Sync constraints ---
586
587    #[test]
588    fn pool_is_send_and_sync() {
589        fn _assert_send<T: Send>() {}
590        fn _assert_sync<T: Sync>() {}
591        _assert_send::<crate::pool::Pool>();
592        _assert_sync::<crate::pool::Pool>();
593    }
594
595    #[test]
596    fn pool_connection_is_send_and_sync() {
597        fn _assert_send<T: Send>() {}
598        fn _assert_sync<T: Sync>() {}
599        _assert_send::<crate::pool::PoolConnection>();
600        _assert_sync::<crate::pool::PoolConnection>();
601    }
602
603    #[test]
604    fn transaction_is_send_and_sync() {
605        fn _assert_send<T: Send>() {}
606        fn _assert_sync<T: Sync>() {}
607        _assert_send::<crate::transaction::Transaction>();
608        _assert_sync::<crate::transaction::Transaction>();
609    }
610
611    #[test]
612    fn owned_result_is_send_and_sync() {
613        fn _assert_send<T: Send>() {}
614        fn _assert_sync<T: Sync>() {}
615        _assert_send::<OwnedResult>();
616        _assert_sync::<OwnedResult>();
617    }
618
619    #[cfg(feature = "async")]
620    #[test]
621    fn executor_trait_requires_send_sync() {
622        // Verify that the async Executor trait has Send + Sync bounds.
623        // This is a compile-time test: if it compiles, the constraint holds.
624        fn _check_executor_bounds<E: Executor>() {
625            fn _assert_send_sync<T: Send + Sync>() {}
626            _assert_send_sync::<E>();
627        }
628    }
629}