hyperdb_api/async_prepared.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! High-level async prepared statements.
5//!
6//! Async mirror of [`PreparedStatement`](crate::PreparedStatement); see
7//! that type's docs for the design rationale.
8
9use std::sync::Arc;
10
11use hyperdb_api_core::client::AsyncPreparedStatement as LowLevelAsyncPreparedStatement;
12use hyperdb_api_core::types::Oid;
13
14use crate::async_connection::AsyncConnection;
15use crate::async_result::AsyncRowset;
16use crate::async_transport::AsyncTransport;
17use crate::error::{Error, Result};
18use crate::params::ToSqlParam;
19use crate::result::{ResultColumn, ResultSchema, Row, RowValue};
20
21/// A handle to a server-side prepared statement (async).
22///
23/// Construct via [`AsyncConnection::prepare`] or
24/// [`AsyncConnection::prepare_typed`]. Holding this type keeps the
25/// statement allocated on the server; it is released automatically when
26/// the handle is dropped (best-effort — see
27/// [`hyperdb_api_core::client::AsyncPreparedStatement`] for the Drop semantics).
28/// The owned variant [`AsyncPreparedStatementOwned`] also provides an
29/// explicit [`close`](AsyncPreparedStatementOwned::close) method for
30/// callers that want deterministic cleanup.
31#[derive(Debug)]
32pub struct AsyncPreparedStatement<'conn> {
33 connection: &'conn AsyncConnection,
34 inner: LowLevelAsyncPreparedStatement,
35 schema: Arc<ResultSchema>,
36}
37
38impl<'conn> AsyncPreparedStatement<'conn> {
39 #[expect(
40 clippy::unnecessary_wraps,
41 reason = "signature retained for API symmetry / future fallibility; returning Result/Option keeps callers from breaking when the function later grows failure cases"
42 )]
43 pub(crate) fn new(
44 connection: &'conn AsyncConnection,
45 inner: LowLevelAsyncPreparedStatement,
46 ) -> Result<Self> {
47 let schema = build_schema_from_columns(inner.columns());
48 Ok(Self {
49 connection,
50 inner,
51 schema: Arc::new(schema),
52 })
53 }
54
55 /// Number of parameters the statement expects.
56 #[must_use]
57 pub fn param_count(&self) -> usize {
58 self.inner.param_count()
59 }
60
61 /// Parameter type OIDs.
62 #[must_use]
63 pub fn param_types(&self) -> &[Oid] {
64 self.inner.param_types()
65 }
66
67 /// Result-column schema, always available (captured at prepare time).
68 #[must_use]
69 pub fn schema(&self) -> &ResultSchema {
70 &self.schema
71 }
72
73 /// The original SQL text.
74 #[must_use]
75 pub fn sql(&self) -> &str {
76 self.inner.query()
77 }
78
79 /// Executes the statement and returns a streaming [`AsyncRowset`].
80 ///
81 /// # Errors
82 ///
83 /// - Returns [`Error::Other`] on gRPC transport.
84 /// - Returns [`Error::Client`] if the server rejects `Bind` or
85 /// `Execute`.
86 /// - Returns [`Error::Io`] on transport-level I/O failures.
87 pub async fn query(&self, params: &[&dyn ToSqlParam]) -> Result<AsyncRowset<'conn>> {
88 let encoded = encode_params(params);
89 let client = async_tcp_client(self.connection)?;
90 let stream = client
91 .execute_prepared_streaming(
92 &self.inner,
93 encoded,
94 crate::result::DEFAULT_BINARY_CHUNK_SIZE,
95 )
96 .await?;
97 Ok(AsyncRowset::from_prepared(stream))
98 }
99
100 /// Executes the statement as a command and returns the affected-row
101 /// count (async).
102 ///
103 /// # Errors
104 ///
105 /// - Returns [`Error::Other`] on gRPC transport.
106 /// - Returns [`Error::Client`] if the server rejects `Bind` or
107 /// `Execute`.
108 /// - Returns [`Error::Io`] on transport-level I/O failures.
109 pub async fn execute(&self, params: &[&dyn ToSqlParam]) -> Result<u64> {
110 let encoded = encode_params(params);
111 let client = async_tcp_client(self.connection)?;
112 Ok(client
113 .execute_prepared_no_result(&self.inner, encoded)
114 .await?)
115 }
116
117 /// Fetches exactly one row; errors if the result is empty.
118 ///
119 /// # Errors
120 ///
121 /// - Returns the error from [`query`](Self::query).
122 /// - Returns [`Error::Other`] with message `"Query returned no rows"`
123 /// if the result is empty.
124 pub async fn fetch_one(&self, params: &[&dyn ToSqlParam]) -> Result<Row> {
125 self.query(params).await?.require_first_row().await
126 }
127
128 /// Fetches at most one row; returns `None` if the result is empty.
129 ///
130 /// # Errors
131 ///
132 /// Returns the error from [`query`](Self::query); an empty result
133 /// yields `Ok(None)`.
134 pub async fn fetch_optional(&self, params: &[&dyn ToSqlParam]) -> Result<Option<Row>> {
135 self.query(params).await?.first_row().await
136 }
137
138 /// Fetches every row into a `Vec`.
139 ///
140 /// # Errors
141 ///
142 /// Returns the error from [`query`](Self::query), or a transport
143 /// error produced while draining every chunk.
144 pub async fn fetch_all(&self, params: &[&dyn ToSqlParam]) -> Result<Vec<Row>> {
145 self.query(params).await?.collect_rows().await
146 }
147
148 /// Fetches a single non-NULL scalar; errors on empty / NULL.
149 ///
150 /// # Errors
151 ///
152 /// - Returns the error from [`query`](Self::query).
153 /// - Returns [`Error::Other`] with message `"Query returned no rows"`
154 /// if the result is empty.
155 /// - Returns [`Error::Other`] with message `"Scalar query returned NULL"`
156 /// if the first cell is SQL `NULL`.
157 pub async fn fetch_scalar<T: RowValue>(&self, params: &[&dyn ToSqlParam]) -> Result<T> {
158 self.query(params).await?.require_scalar().await
159 }
160
161 /// Fetches a single scalar, allowing NULL as `None`.
162 ///
163 /// # Errors
164 ///
165 /// Returns the error from [`query`](Self::query); SQL `NULL` yields
166 /// `Ok(None)`.
167 pub async fn fetch_optional_scalar<T: RowValue>(
168 &self,
169 params: &[&dyn ToSqlParam],
170 ) -> Result<Option<T>> {
171 self.query(params).await?.scalar().await
172 }
173}
174
175pub(crate) fn encode_params(params: &[&dyn ToSqlParam]) -> Vec<Option<Vec<u8>>> {
176 params.iter().map(|p| p.encode_param()).collect()
177}
178
179// =============================================================================
180// AsyncPreparedStatementOwned — lifetime-free variant
181// =============================================================================
182
183/// Owned-handle variant of [`AsyncPreparedStatement`] that holds an
184/// `Arc<AsyncConnection>` instead of a borrow.
185///
186/// Semantics are identical to [`AsyncPreparedStatement`]. The only
187/// difference is that this variant is `'static` and can therefore live
188/// in structs that can't carry lifetimes — N-API classes, `tokio::spawn`
189/// tasks that outlive the constructor, etc.
190#[derive(Debug)]
191pub struct AsyncPreparedStatementOwned {
192 connection: Arc<AsyncConnection>,
193 inner: LowLevelAsyncPreparedStatement,
194 schema: Arc<ResultSchema>,
195}
196
197impl AsyncPreparedStatementOwned {
198 #[expect(
199 clippy::unnecessary_wraps,
200 reason = "signature retained for API symmetry / future fallibility; returning Result/Option keeps callers from breaking when the function later grows failure cases"
201 )]
202 pub(crate) fn new(
203 connection: Arc<AsyncConnection>,
204 inner: LowLevelAsyncPreparedStatement,
205 ) -> Result<Self> {
206 let schema = build_schema_from_columns(inner.columns());
207 Ok(Self {
208 connection,
209 inner,
210 schema: Arc::new(schema),
211 })
212 }
213
214 /// Number of parameters the statement expects.
215 #[must_use]
216 pub fn param_count(&self) -> usize {
217 self.inner.param_count()
218 }
219
220 /// Parameter type OIDs.
221 #[must_use]
222 pub fn param_types(&self) -> &[Oid] {
223 self.inner.param_types()
224 }
225
226 /// Result-column schema, captured at prepare time.
227 #[must_use]
228 pub fn schema(&self) -> &ResultSchema {
229 &self.schema
230 }
231
232 /// Original SQL text.
233 #[must_use]
234 pub fn sql(&self) -> &str {
235 self.inner.query()
236 }
237
238 /// Executes the statement and returns a materialized `Vec<Row>`.
239 ///
240 /// Unlike [`AsyncPreparedStatement::query`], the owned variant
241 /// returns an owned `Vec<Row>` rather than a streaming
242 /// [`AsyncRowset`]: `AsyncRowset` is itself lifetime-bound to
243 /// the connection's mutex guard, which defeats the purpose of the
244 /// owned wrapper. N-API callers that want streaming should fall
245 /// back to the non-owned `AsyncPreparedStatement` via
246 /// [`AsyncConnection::prepare`] or use the non-streaming query
247 /// methods below.
248 ///
249 /// # Errors
250 ///
251 /// - Returns [`Error::Other`] on gRPC transport.
252 /// - Returns [`Error::Client`] if the server rejects `Bind` or
253 /// `Execute`, or raises a runtime error while streaming.
254 /// - Returns [`Error::Io`] on transport-level I/O failures.
255 pub async fn fetch_all(&self, params: &[&dyn ToSqlParam]) -> Result<Vec<Row>> {
256 let encoded = encode_params(params);
257 let client = async_tcp_client_arc(&self.connection)?;
258 let stream = client
259 .execute_prepared_streaming(
260 &self.inner,
261 encoded,
262 crate::result::DEFAULT_BINARY_CHUNK_SIZE,
263 )
264 .await?;
265 let rowset = AsyncRowset::from_prepared(stream);
266 rowset.collect_rows().await
267 }
268
269 /// Executes the statement as a command; returns the affected-row count.
270 ///
271 /// # Errors
272 ///
273 /// - Returns [`Error::Other`] on gRPC transport.
274 /// - Returns [`Error::Client`] if the server rejects `Bind` or
275 /// `Execute`.
276 /// - Returns [`Error::Io`] on transport-level I/O failures.
277 pub async fn execute(&self, params: &[&dyn ToSqlParam]) -> Result<u64> {
278 let encoded = encode_params(params);
279 let client = async_tcp_client_arc(&self.connection)?;
280 Ok(client
281 .execute_prepared_no_result(&self.inner, encoded)
282 .await?)
283 }
284
285 /// Fetches exactly one row; errors on empty.
286 ///
287 /// # Errors
288 ///
289 /// - Returns the error from [`fetch_all`](Self::fetch_all).
290 /// - Returns [`Error::Other`] with message `"Query returned no rows"`
291 /// if the result is empty.
292 pub async fn fetch_one(&self, params: &[&dyn ToSqlParam]) -> Result<Row> {
293 self.fetch_all(params)
294 .await?
295 .into_iter()
296 .next()
297 .ok_or_else(|| crate::error::Error::new("Query returned no rows"))
298 }
299
300 /// Fetches at most one row; `None` on empty.
301 ///
302 /// # Errors
303 ///
304 /// Returns the error from [`fetch_all`](Self::fetch_all); an empty
305 /// result yields `Ok(None)`.
306 pub async fn fetch_optional(&self, params: &[&dyn ToSqlParam]) -> Result<Option<Row>> {
307 Ok(self.fetch_all(params).await?.into_iter().next())
308 }
309
310 /// Fetches the first column of the first row as `T`.
311 ///
312 /// # Errors
313 ///
314 /// - Returns the error from [`fetch_one`](Self::fetch_one).
315 /// - Returns [`Error::Other`] with message `"Scalar query returned NULL"`
316 /// if the first cell is SQL `NULL`.
317 pub async fn fetch_scalar<T: RowValue>(&self, params: &[&dyn ToSqlParam]) -> Result<T> {
318 let row = self.fetch_one(params).await?;
319 row.get::<T>(0)
320 .ok_or_else(|| crate::error::Error::new("Scalar query returned NULL"))
321 }
322
323 /// Fetches the first column of the first row as `Option<T>`.
324 ///
325 /// # Errors
326 ///
327 /// Returns the error from [`fetch_optional`](Self::fetch_optional);
328 /// SQL `NULL` yields `Ok(None)`.
329 pub async fn fetch_optional_scalar<T: RowValue>(
330 &self,
331 params: &[&dyn ToSqlParam],
332 ) -> Result<Option<T>> {
333 Ok(self
334 .fetch_optional(params)
335 .await?
336 .and_then(|r| r.get::<T>(0)))
337 }
338
339 /// Explicitly close the statement on the server.
340 ///
341 /// Equivalent to dropping the struct — the inner
342 /// `hyperdb_api_core::client::AsyncPreparedStatement` has its own Drop-time
343 /// best-effort close.
344 pub fn close(self) {
345 drop(self);
346 }
347}
348
349fn async_tcp_client_arc(
350 connection: &Arc<AsyncConnection>,
351) -> Result<&hyperdb_api_core::client::AsyncClient> {
352 match connection.transport() {
353 AsyncTransport::Tcp(tcp) => Ok(&tcp.client),
354 AsyncTransport::Grpc(_) => Err(Error::new(
355 "prepared statements are not supported over gRPC transport",
356 )),
357 }
358}
359
360pub(crate) fn async_tcp_client(
361 connection: &AsyncConnection,
362) -> Result<&hyperdb_api_core::client::AsyncClient> {
363 match connection.transport() {
364 AsyncTransport::Tcp(tcp) => Ok(&tcp.client),
365 AsyncTransport::Grpc(_) => Err(Error::new(
366 "prepared statements are not supported over gRPC transport",
367 )),
368 }
369}
370
371fn build_schema_from_columns(cols: &[hyperdb_api_core::client::Column]) -> ResultSchema {
372 let columns = cols
373 .iter()
374 .enumerate()
375 .map(|(idx, col)| {
376 let sql_type = hyperdb_api_core::types::SqlType::from_oid_and_modifier(
377 col.type_oid().0,
378 col.type_modifier(),
379 );
380 ResultColumn::new(col.name(), sql_type, idx)
381 })
382 .collect();
383 ResultSchema::from_columns(columns)
384}