hyperdb_api/prepared.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! High-level prepared statements.
5//!
6//! [`PreparedStatement`] wraps [`hyperdb_api_core::client::OwnedPreparedStatement`]
7//! and integrates it with the rest of the hyperdb-api surface:
8//!
9//! - Returns [`Rowset`](crate::Rowset) from streaming executions, so
10//! row decoding, schema capture, and `Row::get::<T>()` work exactly
11//! the same way as with [`Connection::execute_query`](crate::Connection::execute_query).
12//! - `execute` / `fetch_one` / `fetch_optional` / `fetch_all` /
13//! `fetch_scalar` mirror the same helpers on `Connection`.
14//! - `OwnedPreparedStatement` already auto-closes on Drop at the lower
15//! layer, so this wrapper needs no additional cleanup logic.
16
17use std::sync::Arc;
18
19use hyperdb_api_core::client::OwnedPreparedStatement;
20use hyperdb_api_core::types::Oid;
21
22use crate::connection::Connection;
23use crate::error::{Error, Result};
24use crate::params::ToSqlParam;
25use crate::result::{ResultColumn, ResultSchema, Row, RowValue, Rowset};
26use crate::transport::Transport;
27
28/// A handle to a server-side prepared statement.
29///
30/// Construct via [`Connection::prepare`] or
31/// [`Connection::prepare_typed`]. Holding this type keeps the statement
32/// allocated on the server; it is released automatically when the handle
33/// is dropped.
34///
35/// # Reuse
36///
37/// A single `PreparedStatement` can be executed many times with different
38/// parameter values — the server caches the parsed plan. This is the
39/// primary reason to use prepared statements over
40/// [`Connection::query_params`] for loops over user input.
41#[derive(Debug)]
42pub struct PreparedStatement<'conn> {
43 connection: &'conn Connection,
44 inner: OwnedPreparedStatement,
45 schema: Arc<ResultSchema>,
46}
47
48impl<'conn> PreparedStatement<'conn> {
49 #[expect(
50 clippy::unnecessary_wraps,
51 reason = "signature retained for API symmetry / future fallibility; returning Result/Option keeps callers from breaking when the function later grows failure cases"
52 )]
53 pub(crate) fn new(
54 connection: &'conn Connection,
55 inner: OwnedPreparedStatement,
56 ) -> Result<Self> {
57 let schema = build_schema_from_columns(inner.columns());
58 Ok(Self {
59 connection,
60 inner,
61 schema: Arc::new(schema),
62 })
63 }
64
65 /// Returns the number of parameters the statement expects.
66 #[must_use]
67 pub fn param_count(&self) -> usize {
68 self.inner.param_count()
69 }
70
71 /// Returns the parameter type OIDs (as the server inferred or the
72 /// caller explicitly passed to [`Connection::prepare_typed`]).
73 #[must_use]
74 pub fn param_types(&self) -> &[Oid] {
75 self.inner.param_types()
76 }
77
78 /// Returns the result-column schema. Always available — it was
79 /// captured during the Parse/Describe at prepare time.
80 #[must_use]
81 pub fn schema(&self) -> &ResultSchema {
82 &self.schema
83 }
84
85 /// The original SQL text.
86 #[must_use]
87 pub fn sql(&self) -> &str {
88 self.inner.query()
89 }
90
91 /// Executes the statement and returns a streaming [`Rowset`].
92 ///
93 /// Memory stays bounded to one chunk regardless of result size —
94 /// the prepared-statement equivalent of
95 /// [`Connection::execute_query`].
96 ///
97 /// # Errors
98 ///
99 /// - Returns [`Error::Other`] if the underlying [`Connection`] is on
100 /// gRPC transport (prepared statements are TCP-only).
101 /// - Returns [`Error::Client`] if the server rejects `Bind` or
102 /// `Execute` (type mismatch, runtime error while streaming).
103 /// - Returns [`Error::Io`] on transport-level I/O failures.
104 pub fn query(&self, params: &[&dyn ToSqlParam]) -> Result<Rowset<'conn>> {
105 let encoded = encode_params(params);
106 let client = tcp_client(self.connection)?;
107 let stream = client.execute_streaming(
108 &self.inner,
109 encoded,
110 crate::result::DEFAULT_BINARY_CHUNK_SIZE,
111 )?;
112 Ok(Rowset::from_prepared(stream))
113 }
114
115 /// Executes the statement as a command (INSERT / UPDATE / DELETE /
116 /// DDL) and returns the affected-row count.
117 ///
118 /// # Errors
119 ///
120 /// - Returns [`Error::Other`] on gRPC transport.
121 /// - Returns [`Error::Client`] if the server rejects `Bind` or
122 /// `Execute`.
123 /// - Returns [`Error::Io`] on transport-level I/O failures.
124 pub fn execute(&self, params: &[&dyn ToSqlParam]) -> Result<u64> {
125 let encoded = encode_params(params);
126 let client = tcp_client(self.connection)?;
127 Ok(client.execute_no_result(&self.inner, encoded)?)
128 }
129
130 /// Fetches exactly one row; errors if the result is empty.
131 ///
132 /// # Errors
133 ///
134 /// - Returns the error from [`query`](Self::query).
135 /// - Returns [`Error::Other`] with message `"Query returned no rows"`
136 /// if the result is empty.
137 pub fn fetch_one(&self, params: &[&dyn ToSqlParam]) -> Result<Row> {
138 self.query(params)?.require_first_row()
139 }
140
141 /// Fetches at most one row; returns `None` if the result is empty.
142 ///
143 /// # Errors
144 ///
145 /// Returns the error from [`query`](Self::query); an empty result
146 /// yields `Ok(None)`.
147 pub fn fetch_optional(&self, params: &[&dyn ToSqlParam]) -> Result<Option<Row>> {
148 self.query(params)?.first_row()
149 }
150
151 /// Fetches every row into a `Vec`.
152 ///
153 /// # Errors
154 ///
155 /// Returns the error from [`query`](Self::query), or a transport error
156 /// produced while draining every chunk.
157 pub fn fetch_all(&self, params: &[&dyn ToSqlParam]) -> Result<Vec<Row>> {
158 self.query(params)?.collect_rows()
159 }
160
161 /// Fetches a single non-NULL scalar; errors on empty / NULL.
162 ///
163 /// # Errors
164 ///
165 /// - Returns the error from [`query`](Self::query).
166 /// - Returns [`Error::Other`] with message `"Query returned no rows"`
167 /// if the result is empty.
168 /// - Returns [`Error::Other`] with message `"Scalar query returned NULL"`
169 /// if the first cell is SQL `NULL`.
170 pub fn fetch_scalar<T: RowValue>(&self, params: &[&dyn ToSqlParam]) -> Result<T> {
171 self.query(params)?.require_scalar()
172 }
173
174 /// Fetches a single scalar, allowing NULL as `None`.
175 ///
176 /// # Errors
177 ///
178 /// Returns the error from [`query`](Self::query). An empty result
179 /// still errors (see [`fetch_scalar`](Self::fetch_scalar)); SQL `NULL`
180 /// yields `Ok(None)`.
181 pub fn fetch_optional_scalar<T: RowValue>(
182 &self,
183 params: &[&dyn ToSqlParam],
184 ) -> Result<Option<T>> {
185 self.query(params)?.scalar()
186 }
187}
188
189/// Encode a slice of `&dyn ToSqlParam` into the binary-bytes form the
190/// prepared-statement Bind message expects. `None` encodes SQL NULL.
191pub(crate) fn encode_params(params: &[&dyn ToSqlParam]) -> Vec<Option<Vec<u8>>> {
192 params.iter().map(|p| p.encode_param()).collect()
193}
194
195/// Extract the underlying sync TCP client or error with a clear message
196/// if the connection is on gRPC.
197pub(crate) fn tcp_client(connection: &Connection) -> Result<&hyperdb_api_core::client::Client> {
198 match connection.transport() {
199 Transport::Tcp(tcp) => Ok(&tcp.client),
200 Transport::Grpc(_) => Err(Error::new(
201 "prepared statements are not supported over gRPC transport",
202 )),
203 }
204}
205
206/// Build a `ResultSchema` from a slice of `hyperdb_api_core::client::Column`, using
207/// `SqlType::from_oid_and_modifier` so NUMERIC / VARCHAR modifiers are
208/// preserved.
209fn build_schema_from_columns(cols: &[hyperdb_api_core::client::Column]) -> ResultSchema {
210 let columns = cols
211 .iter()
212 .enumerate()
213 .map(|(idx, col)| {
214 let sql_type = hyperdb_api_core::types::SqlType::from_oid_and_modifier(
215 col.type_oid().0,
216 col.type_modifier(),
217 );
218 ResultColumn::new(col.name(), sql_type, idx)
219 })
220 .collect();
221 ResultSchema::from_columns(columns)
222}