Skip to main content

bsql_core/
executor.rs

1//! The `Executor` trait -- the runtime contract between generated code and the pool.
2//!
3//! Code generated by `bsql::query!` calls methods on this trait. `Pool`,
4//! `PoolConnection`, and `Transaction` all implement it.
5//!
6//! v0.7: `query_raw` returns `Arc<Vec<Row>>` to enable zero-copy singleflight
7//! sharing. `Arc<Vec<Row>>` derefs to `Vec<Row>`, so `.iter()`, `.len()`,
8//! and `&rows[i]` all work identically. The generated code is unaffected.
9//!
10//! v0.7: `query_raw_readonly` routes SELECT queries to replicas when
11//! read/write splitting is configured. Falls through to primary otherwise.
12
13use std::sync::Arc;
14
15use tokio_postgres::types::ToSql;
16
17use crate::error::{BsqlError, BsqlResult};
18use crate::pool::{Pool, PoolConnection};
19use crate::transaction::Transaction;
20
21/// Execute a prepared query and return raw rows.
22///
23/// This trait is sealed -- it cannot be implemented outside of bsql-core.
24/// The generated code calls `query_raw`, `query_raw_readonly`, and
25/// `execute_raw` on `&Pool`, `&PoolConnection`, or `&Transaction`.
26pub trait Executor: sealed::Sealed {
27    /// Execute a query and return all rows (shared via Arc for singleflight).
28    fn query_raw(
29        &self,
30        sql: &str,
31        params: &[&(dyn ToSql + Sync)],
32    ) -> impl std::future::Future<Output = BsqlResult<Arc<Vec<tokio_postgres::Row>>>> + Send;
33
34    /// Execute a read-only query. Routes to replicas when available.
35    ///
36    /// For `Pool`: routes to replica pool if replicas are configured,
37    /// otherwise falls through to primary.
38    /// For `PoolConnection`/`Transaction`: identical to `query_raw`
39    /// (connection is already bound).
40    fn query_raw_readonly(
41        &self,
42        sql: &str,
43        params: &[&(dyn ToSql + Sync)],
44    ) -> impl std::future::Future<Output = BsqlResult<Arc<Vec<tokio_postgres::Row>>>> + Send;
45
46    /// Execute a query and return the number of affected rows.
47    fn execute_raw(
48        &self,
49        sql: &str,
50        params: &[&(dyn ToSql + Sync)],
51    ) -> impl std::future::Future<Output = BsqlResult<u64>> + Send;
52}
53
54mod sealed {
55    pub trait Sealed {}
56    impl Sealed for super::Pool {}
57    impl Sealed for super::PoolConnection {}
58    impl Sealed for super::Transaction {}
59}
60
61impl Executor for Pool {
62    async fn query_raw(
63        &self,
64        sql: &str,
65        params: &[&(dyn ToSql + Sync)],
66    ) -> BsqlResult<Arc<Vec<tokio_postgres::Row>>> {
67        self.query_raw_primary(sql, params).await
68    }
69
70    async fn query_raw_readonly(
71        &self,
72        sql: &str,
73        params: &[&(dyn ToSql + Sync)],
74    ) -> BsqlResult<Arc<Vec<tokio_postgres::Row>>> {
75        self.query_raw_read(sql, params).await
76    }
77
78    async fn execute_raw(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> BsqlResult<u64> {
79        let conn = self.acquire().await?;
80        conn.execute_raw(sql, params).await
81    }
82}
83
84impl Executor for PoolConnection {
85    async fn query_raw(
86        &self,
87        sql: &str,
88        params: &[&(dyn ToSql + Sync)],
89    ) -> BsqlResult<Arc<Vec<tokio_postgres::Row>>> {
90        // prepare_cached: prepares on first use, reuses on subsequent calls.
91        // Named statements persist across borrows of the same connection.
92        let stmt = self
93            .inner
94            .prepare_cached(sql)
95            .await
96            .map_err(BsqlError::from)?;
97
98        let rows = self
99            .inner
100            .query(&stmt, params)
101            .await
102            .map_err(BsqlError::from)?;
103
104        Ok(Arc::new(rows))
105    }
106
107    async fn query_raw_readonly(
108        &self,
109        sql: &str,
110        params: &[&(dyn ToSql + Sync)],
111    ) -> BsqlResult<Arc<Vec<tokio_postgres::Row>>> {
112        // PoolConnection is already bound -- no routing.
113        self.query_raw(sql, params).await
114    }
115
116    async fn execute_raw(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> BsqlResult<u64> {
117        let stmt = self
118            .inner
119            .prepare_cached(sql)
120            .await
121            .map_err(BsqlError::from)?;
122
123        self.inner
124            .execute(&stmt, params)
125            .await
126            .map_err(BsqlError::from)
127    }
128}
129
130impl Executor for Transaction {
131    async fn query_raw(
132        &self,
133        sql: &str,
134        params: &[&(dyn ToSql + Sync)],
135    ) -> BsqlResult<Arc<Vec<tokio_postgres::Row>>> {
136        self.connection().query_raw(sql, params).await
137    }
138
139    async fn query_raw_readonly(
140        &self,
141        sql: &str,
142        params: &[&(dyn ToSql + Sync)],
143    ) -> BsqlResult<Arc<Vec<tokio_postgres::Row>>> {
144        // Transaction is bound to a single connection -- no routing.
145        self.query_raw(sql, params).await
146    }
147
148    async fn execute_raw(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> BsqlResult<u64> {
149        self.connection().execute_raw(sql, params).await
150    }
151}