bsql-core 0.8.0

Runtime support for bsql — compile-time safe SQL for Rust
Documentation
//! The `Executor` trait -- the runtime contract between generated code and the pool.
//!
//! Code generated by `bsql::query!` calls methods on this trait. `Pool`,
//! `PoolConnection`, and `Transaction` all implement it.
//!
//! The associated type `Rows` controls the return wrapper:
//! - `Pool` returns `Arc<[Row]>` (singleflight needs shared ownership).
//! - `PoolConnection` and `Transaction` return `Vec<Row>` (no sharing,
//!   avoids ~20-40ns atomic ops + Arc control-block allocation per query).
//!
//! Generated code only uses `Deref<Target = [Row]>` operations (`.len()`,
//! `.iter()`, `&rows[i]`), so it works identically with both types.
//!
//! v0.7: `query_raw_readonly` routes SELECT queries to replicas when
//! read/write splitting is configured. Falls through to primary otherwise.

use std::sync::Arc;

use tokio_postgres::types::ToSql;

use crate::error::{BsqlError, BsqlResult};
use crate::pool::{Pool, PoolConnection};
use crate::transaction::Transaction;

/// Execute a prepared query and return raw rows.
///
/// This trait is sealed -- it cannot be implemented outside of bsql-core.
/// The generated code calls `query_raw`, `query_raw_readonly`, and
/// `execute_raw` on `&Pool`, `&PoolConnection`, or `&Transaction`.
pub trait Executor: sealed::Sealed {
    /// Row container returned by `query_raw` / `query_raw_readonly`.
    ///
    /// Must deref to `[Row]` so generated code can use `.len()`, `.iter()`,
    /// and indexing uniformly. `Pool` uses `Arc<[Row]>` for singleflight
    /// sharing; `PoolConnection` and `Transaction` use plain `Vec<Row>`.
    type Rows: std::ops::Deref<Target = [tokio_postgres::Row]> + Send;

    /// Execute a query and return all rows.
    fn query_raw(
        &self,
        sql: &str,
        params: &[&(dyn ToSql + Sync)],
    ) -> impl std::future::Future<Output = BsqlResult<Self::Rows>> + Send;

    /// Execute a read-only query. Routes to replicas when available.
    ///
    /// For `Pool`: routes to replica pool if replicas are configured,
    /// otherwise falls through to primary.
    /// For `PoolConnection`/`Transaction`: identical to `query_raw`
    /// (connection is already bound).
    fn query_raw_readonly(
        &self,
        sql: &str,
        params: &[&(dyn ToSql + Sync)],
    ) -> impl std::future::Future<Output = BsqlResult<Self::Rows>> + Send;

    /// Execute a query and return the number of affected rows.
    fn execute_raw(
        &self,
        sql: &str,
        params: &[&(dyn ToSql + Sync)],
    ) -> impl std::future::Future<Output = BsqlResult<u64>> + Send;
}

mod sealed {
    pub trait Sealed {}
    impl Sealed for super::Pool {}
    impl Sealed for super::PoolConnection {}
    impl Sealed for super::Transaction {}
}

impl Executor for Pool {
    type Rows = Arc<[tokio_postgres::Row]>;

    async fn query_raw(
        &self,
        sql: &str,
        params: &[&(dyn ToSql + Sync)],
    ) -> BsqlResult<Arc<[tokio_postgres::Row]>> {
        self.query_raw_primary(sql, params).await
    }

    async fn query_raw_readonly(
        &self,
        sql: &str,
        params: &[&(dyn ToSql + Sync)],
    ) -> BsqlResult<Arc<[tokio_postgres::Row]>> {
        self.query_raw_read(sql, params).await
    }

    async fn execute_raw(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> BsqlResult<u64> {
        let conn = self.acquire().await?;
        conn.execute_raw(sql, params).await
    }
}

impl Executor for PoolConnection {
    type Rows = Vec<tokio_postgres::Row>;

    async fn query_raw(
        &self,
        sql: &str,
        params: &[&(dyn ToSql + Sync)],
    ) -> BsqlResult<Vec<tokio_postgres::Row>> {
        // prepare_cached: prepares on first use, reuses on subsequent calls.
        // Named statements persist across borrows of the same connection.
        let stmt = self
            .inner
            .prepare_cached(sql)
            .await
            .map_err(BsqlError::from)?;

        self.inner
            .query(&stmt, params)
            .await
            .map_err(BsqlError::from)
    }

    async fn query_raw_readonly(
        &self,
        sql: &str,
        params: &[&(dyn ToSql + Sync)],
    ) -> BsqlResult<Vec<tokio_postgres::Row>> {
        // PoolConnection is already bound -- no routing.
        self.query_raw(sql, params).await
    }

    async fn execute_raw(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> BsqlResult<u64> {
        let stmt = self
            .inner
            .prepare_cached(sql)
            .await
            .map_err(BsqlError::from)?;

        self.inner
            .execute(&stmt, params)
            .await
            .map_err(BsqlError::from)
    }
}

impl Executor for Transaction {
    type Rows = Vec<tokio_postgres::Row>;

    async fn query_raw(
        &self,
        sql: &str,
        params: &[&(dyn ToSql + Sync)],
    ) -> BsqlResult<Vec<tokio_postgres::Row>> {
        self.ensure_begun().await?;
        self.connection().query_raw(sql, params).await
    }

    async fn query_raw_readonly(
        &self,
        sql: &str,
        params: &[&(dyn ToSql + Sync)],
    ) -> BsqlResult<Vec<tokio_postgres::Row>> {
        // Transaction is bound to a single connection -- no routing.
        self.query_raw(sql, params).await
    }

    async fn execute_raw(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> BsqlResult<u64> {
        self.ensure_begun().await?;
        self.connection().execute_raw(sql, params).await
    }
}