Skip to main content

hyperdb_api/
prepared.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! High-level prepared statements.
5//!
6//! [`PreparedStatement`] wraps [`hyperdb_api_core::client::OwnedPreparedStatement`]
7//! and integrates it with the rest of the hyperdb-api surface:
8//!
9//! - Returns [`Rowset`](crate::Rowset) from streaming executions, so
10//!   row decoding, schema capture, and `Row::get::<T>()` work exactly
11//!   the same way as with [`Connection::execute_query`](crate::Connection::execute_query).
12//! - `execute` / `fetch_one` / `fetch_optional` / `fetch_all` /
13//!   `fetch_scalar` mirror the same helpers on `Connection`.
14//! - `OwnedPreparedStatement` already auto-closes on Drop at the lower
15//!   layer, so this wrapper needs no additional cleanup logic.
16
17use std::sync::Arc;
18
19use hyperdb_api_core::client::OwnedPreparedStatement;
20use hyperdb_api_core::types::Oid;
21
22use crate::connection::Connection;
23use crate::error::{Error, Result};
24use crate::params::ToSqlParam;
25use crate::result::{ResultColumn, ResultSchema, Row, RowValue, Rowset};
26use crate::transport::Transport;
27
28/// A handle to a server-side prepared statement.
29///
30/// Construct via [`Connection::prepare`] or
31/// [`Connection::prepare_typed`]. Holding this type keeps the statement
32/// allocated on the server; it is released automatically when the handle
33/// is dropped.
34///
35/// # Reuse
36///
37/// A single `PreparedStatement` can be executed many times with different
38/// parameter values — the server caches the parsed plan. This is the
39/// primary reason to use prepared statements over
40/// [`Connection::query_params`] for loops over user input.
41#[derive(Debug)]
42pub struct PreparedStatement<'conn> {
43    connection: &'conn Connection,
44    inner: OwnedPreparedStatement,
45    schema: Arc<ResultSchema>,
46}
47
48impl<'conn> PreparedStatement<'conn> {
49    #[expect(
50        clippy::unnecessary_wraps,
51        reason = "signature retained for API symmetry / future fallibility; returning Result/Option keeps callers from breaking when the function later grows failure cases"
52    )]
53    pub(crate) fn new(
54        connection: &'conn Connection,
55        inner: OwnedPreparedStatement,
56    ) -> Result<Self> {
57        let schema = build_schema_from_columns(inner.columns());
58        Ok(Self {
59            connection,
60            inner,
61            schema: Arc::new(schema),
62        })
63    }
64
65    /// Returns the number of parameters the statement expects.
66    #[must_use]
67    pub fn param_count(&self) -> usize {
68        self.inner.param_count()
69    }
70
71    /// Returns the parameter type OIDs (as the server inferred or the
72    /// caller explicitly passed to [`Connection::prepare_typed`]).
73    #[must_use]
74    pub fn param_types(&self) -> &[Oid] {
75        self.inner.param_types()
76    }
77
78    /// Returns the result-column schema. Always available — it was
79    /// captured during the Parse/Describe at prepare time.
80    #[must_use]
81    pub fn schema(&self) -> &ResultSchema {
82        &self.schema
83    }
84
85    /// The original SQL text.
86    #[must_use]
87    pub fn sql(&self) -> &str {
88        self.inner.query()
89    }
90
91    /// Executes the statement and returns a streaming [`Rowset`].
92    ///
93    /// Memory stays bounded to one chunk regardless of result size —
94    /// the prepared-statement equivalent of
95    /// [`Connection::execute_query`].
96    ///
97    /// # Errors
98    ///
99    /// - Returns [`Error::Other`] if the underlying [`Connection`] is on
100    ///   gRPC transport (prepared statements are TCP-only).
101    /// - Returns [`Error::Client`] if the server rejects `Bind` or
102    ///   `Execute` (type mismatch, runtime error while streaming).
103    /// - Returns [`Error::Io`] on transport-level I/O failures.
104    pub fn query(&self, params: &[&dyn ToSqlParam]) -> Result<Rowset<'conn>> {
105        let encoded = encode_params(params);
106        let client = tcp_client(self.connection)?;
107        let stream = client.execute_streaming(
108            &self.inner,
109            encoded,
110            crate::result::DEFAULT_BINARY_CHUNK_SIZE,
111        )?;
112        Ok(Rowset::from_prepared(stream))
113    }
114
115    /// Executes the statement as a command (INSERT / UPDATE / DELETE /
116    /// DDL) and returns the affected-row count.
117    ///
118    /// # Errors
119    ///
120    /// - Returns [`Error::Other`] on gRPC transport.
121    /// - Returns [`Error::Client`] if the server rejects `Bind` or
122    ///   `Execute`.
123    /// - Returns [`Error::Io`] on transport-level I/O failures.
124    pub fn execute(&self, params: &[&dyn ToSqlParam]) -> Result<u64> {
125        let encoded = encode_params(params);
126        let client = tcp_client(self.connection)?;
127        Ok(client.execute_no_result(&self.inner, encoded)?)
128    }
129
130    /// Fetches exactly one row; errors if the result is empty.
131    ///
132    /// # Errors
133    ///
134    /// - Returns the error from [`query`](Self::query).
135    /// - Returns [`Error::Other`] with message `"Query returned no rows"`
136    ///   if the result is empty.
137    pub fn fetch_one(&self, params: &[&dyn ToSqlParam]) -> Result<Row> {
138        self.query(params)?.require_first_row()
139    }
140
141    /// Fetches at most one row; returns `None` if the result is empty.
142    ///
143    /// # Errors
144    ///
145    /// Returns the error from [`query`](Self::query); an empty result
146    /// yields `Ok(None)`.
147    pub fn fetch_optional(&self, params: &[&dyn ToSqlParam]) -> Result<Option<Row>> {
148        self.query(params)?.first_row()
149    }
150
151    /// Fetches every row into a `Vec`.
152    ///
153    /// # Errors
154    ///
155    /// Returns the error from [`query`](Self::query), or a transport error
156    /// produced while draining every chunk.
157    pub fn fetch_all(&self, params: &[&dyn ToSqlParam]) -> Result<Vec<Row>> {
158        self.query(params)?.collect_rows()
159    }
160
161    /// Fetches a single non-NULL scalar; errors on empty / NULL.
162    ///
163    /// # Errors
164    ///
165    /// - Returns the error from [`query`](Self::query).
166    /// - Returns [`Error::Other`] with message `"Query returned no rows"`
167    ///   if the result is empty.
168    /// - Returns [`Error::Other`] with message `"Scalar query returned NULL"`
169    ///   if the first cell is SQL `NULL`.
170    pub fn fetch_scalar<T: RowValue>(&self, params: &[&dyn ToSqlParam]) -> Result<T> {
171        self.query(params)?.require_scalar()
172    }
173
174    /// Fetches a single scalar, allowing NULL as `None`.
175    ///
176    /// # Errors
177    ///
178    /// Returns the error from [`query`](Self::query). An empty result
179    /// still errors (see [`fetch_scalar`](Self::fetch_scalar)); SQL `NULL`
180    /// yields `Ok(None)`.
181    pub fn fetch_optional_scalar<T: RowValue>(
182        &self,
183        params: &[&dyn ToSqlParam],
184    ) -> Result<Option<T>> {
185        self.query(params)?.scalar()
186    }
187}
188
189/// Encode a slice of `&dyn ToSqlParam` into the binary-bytes form the
190/// prepared-statement Bind message expects. `None` encodes SQL NULL.
191pub(crate) fn encode_params(params: &[&dyn ToSqlParam]) -> Vec<Option<Vec<u8>>> {
192    params.iter().map(|p| p.encode_param()).collect()
193}
194
195/// Extract the underlying sync TCP client or error with a clear message
196/// if the connection is on gRPC.
197pub(crate) fn tcp_client(connection: &Connection) -> Result<&hyperdb_api_core::client::Client> {
198    match connection.transport() {
199        Transport::Tcp(tcp) => Ok(&tcp.client),
200        Transport::Grpc(_) => Err(Error::new(
201            "prepared statements are not supported over gRPC transport",
202        )),
203    }
204}
205
206/// Build a `ResultSchema` from a slice of `hyperdb_api_core::client::Column`, using
207/// `SqlType::from_oid_and_modifier` so NUMERIC / VARCHAR modifiers are
208/// preserved.
209fn build_schema_from_columns(cols: &[hyperdb_api_core::client::Column]) -> ResultSchema {
210    let columns = cols
211        .iter()
212        .enumerate()
213        .map(|(idx, col)| {
214            let sql_type = hyperdb_api_core::types::SqlType::from_oid_and_modifier(
215                col.type_oid().0,
216                col.type_modifier(),
217            );
218            ResultColumn::new(col.name(), sql_type, idx)
219        })
220        .collect();
221    ResultSchema::from_columns(columns)
222}