Skip to main content

bsql_core/
executor.rs

1//! The `Executor` trait -- the runtime contract between generated code and the pool.
2//!
3//! Code generated by `bsql::query!` calls methods on this trait. `Pool`,
4//! `PoolConnection`, and `Transaction` all implement it.
5//!
6//! The associated type `Rows` controls the return wrapper:
7//! - `Pool` returns `Arc<[Row]>` (singleflight needs shared ownership).
8//! - `PoolConnection` and `Transaction` return `Vec<Row>` (no sharing,
9//!   avoids ~20-40ns atomic ops + Arc control-block allocation per query).
10//!
11//! Generated code only uses `Deref<Target = [Row]>` operations (`.len()`,
12//! `.iter()`, `&rows[i]`), so it works identically with both types.
13//!
14//! v0.7: `query_raw_readonly` routes SELECT queries to replicas when
15//! read/write splitting is configured. Falls through to primary otherwise.
16
17use std::sync::Arc;
18
19use tokio_postgres::types::ToSql;
20
21use crate::error::{BsqlError, BsqlResult};
22use crate::pool::{Pool, PoolConnection};
23use crate::transaction::Transaction;
24
25/// Execute a prepared query and return raw rows.
26///
27/// This trait is sealed -- it cannot be implemented outside of bsql-core.
28/// The generated code calls `query_raw`, `query_raw_readonly`, and
29/// `execute_raw` on `&Pool`, `&PoolConnection`, or `&Transaction`.
30pub trait Executor: sealed::Sealed {
31    /// Row container returned by `query_raw` / `query_raw_readonly`.
32    ///
33    /// Must deref to `[Row]` so generated code can use `.len()`, `.iter()`,
34    /// and indexing uniformly. `Pool` uses `Arc<[Row]>` for singleflight
35    /// sharing; `PoolConnection` and `Transaction` use plain `Vec<Row>`.
36    type Rows: std::ops::Deref<Target = [tokio_postgres::Row]> + Send;
37
38    /// Execute a query and return all rows.
39    fn query_raw(
40        &self,
41        sql: &str,
42        params: &[&(dyn ToSql + Sync)],
43    ) -> impl std::future::Future<Output = BsqlResult<Self::Rows>> + Send;
44
45    /// Execute a read-only query. Routes to replicas when available.
46    ///
47    /// For `Pool`: routes to replica pool if replicas are configured,
48    /// otherwise falls through to primary.
49    /// For `PoolConnection`/`Transaction`: identical to `query_raw`
50    /// (connection is already bound).
51    fn query_raw_readonly(
52        &self,
53        sql: &str,
54        params: &[&(dyn ToSql + Sync)],
55    ) -> impl std::future::Future<Output = BsqlResult<Self::Rows>> + Send;
56
57    /// Execute a query and return the number of affected rows.
58    fn execute_raw(
59        &self,
60        sql: &str,
61        params: &[&(dyn ToSql + Sync)],
62    ) -> impl std::future::Future<Output = BsqlResult<u64>> + Send;
63}
64
65mod sealed {
66    pub trait Sealed {}
67    impl Sealed for super::Pool {}
68    impl Sealed for super::PoolConnection {}
69    impl Sealed for super::Transaction {}
70}
71
72impl Executor for Pool {
73    type Rows = Arc<[tokio_postgres::Row]>;
74
75    async fn query_raw(
76        &self,
77        sql: &str,
78        params: &[&(dyn ToSql + Sync)],
79    ) -> BsqlResult<Arc<[tokio_postgres::Row]>> {
80        self.query_raw_primary(sql, params).await
81    }
82
83    async fn query_raw_readonly(
84        &self,
85        sql: &str,
86        params: &[&(dyn ToSql + Sync)],
87    ) -> BsqlResult<Arc<[tokio_postgres::Row]>> {
88        self.query_raw_read(sql, params).await
89    }
90
91    async fn execute_raw(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> BsqlResult<u64> {
92        let conn = self.acquire().await?;
93        conn.execute_raw(sql, params).await
94    }
95}
96
97impl Executor for PoolConnection {
98    type Rows = Vec<tokio_postgres::Row>;
99
100    async fn query_raw(
101        &self,
102        sql: &str,
103        params: &[&(dyn ToSql + Sync)],
104    ) -> BsqlResult<Vec<tokio_postgres::Row>> {
105        // prepare_cached: prepares on first use, reuses on subsequent calls.
106        // Named statements persist across borrows of the same connection.
107        let stmt = self
108            .inner
109            .prepare_cached(sql)
110            .await
111            .map_err(BsqlError::from)?;
112
113        self.inner
114            .query(&stmt, params)
115            .await
116            .map_err(BsqlError::from)
117    }
118
119    async fn query_raw_readonly(
120        &self,
121        sql: &str,
122        params: &[&(dyn ToSql + Sync)],
123    ) -> BsqlResult<Vec<tokio_postgres::Row>> {
124        // PoolConnection is already bound -- no routing.
125        self.query_raw(sql, params).await
126    }
127
128    async fn execute_raw(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> BsqlResult<u64> {
129        let stmt = self
130            .inner
131            .prepare_cached(sql)
132            .await
133            .map_err(BsqlError::from)?;
134
135        self.inner
136            .execute(&stmt, params)
137            .await
138            .map_err(BsqlError::from)
139    }
140}
141
142impl Executor for Transaction {
143    type Rows = Vec<tokio_postgres::Row>;
144
145    async fn query_raw(
146        &self,
147        sql: &str,
148        params: &[&(dyn ToSql + Sync)],
149    ) -> BsqlResult<Vec<tokio_postgres::Row>> {
150        self.ensure_begun().await?;
151        self.connection().query_raw(sql, params).await
152    }
153
154    async fn query_raw_readonly(
155        &self,
156        sql: &str,
157        params: &[&(dyn ToSql + Sync)],
158    ) -> BsqlResult<Vec<tokio_postgres::Row>> {
159        // Transaction is bound to a single connection -- no routing.
160        self.query_raw(sql, params).await
161    }
162
163    async fn execute_raw(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> BsqlResult<u64> {
164        self.ensure_begun().await?;
165        self.connection().execute_raw(sql, params).await
166    }
167}