Skip to main content

hyperdb_api/
async_prepared.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! High-level async prepared statements.
5//!
6//! Async mirror of [`PreparedStatement`](crate::PreparedStatement); see
7//! that type's docs for the design rationale.
8
9use std::sync::Arc;
10
11use hyperdb_api_core::client::AsyncPreparedStatement as LowLevelAsyncPreparedStatement;
12use hyperdb_api_core::types::Oid;
13
14use crate::async_connection::AsyncConnection;
15use crate::async_result::AsyncRowset;
16use crate::async_transport::AsyncTransport;
17use crate::error::{Error, Result};
18use crate::params::ToSqlParam;
19use crate::result::{ResultColumn, ResultSchema, Row, RowValue};
20
21/// A handle to a server-side prepared statement (async).
22///
23/// Construct via [`AsyncConnection::prepare`] or
24/// [`AsyncConnection::prepare_typed`]. Holding this type keeps the
25/// statement allocated on the server; it is released automatically when
26/// the handle is dropped (best-effort — see
27/// [`hyperdb_api_core::client::AsyncPreparedStatement`] for the Drop semantics).
28/// The owned variant [`AsyncPreparedStatementOwned`] also provides an
29/// explicit [`close`](AsyncPreparedStatementOwned::close) method for
30/// callers that want deterministic cleanup.
31#[derive(Debug)]
32pub struct AsyncPreparedStatement<'conn> {
33    connection: &'conn AsyncConnection,
34    inner: LowLevelAsyncPreparedStatement,
35    schema: Arc<ResultSchema>,
36}
37
38impl<'conn> AsyncPreparedStatement<'conn> {
39    #[expect(
40        clippy::unnecessary_wraps,
41        reason = "signature retained for API symmetry / future fallibility; returning Result/Option keeps callers from breaking when the function later grows failure cases"
42    )]
43    pub(crate) fn new(
44        connection: &'conn AsyncConnection,
45        inner: LowLevelAsyncPreparedStatement,
46    ) -> Result<Self> {
47        let schema = build_schema_from_columns(inner.columns());
48        Ok(Self {
49            connection,
50            inner,
51            schema: Arc::new(schema),
52        })
53    }
54
55    /// Number of parameters the statement expects.
56    #[must_use]
57    pub fn param_count(&self) -> usize {
58        self.inner.param_count()
59    }
60
61    /// Parameter type OIDs.
62    #[must_use]
63    pub fn param_types(&self) -> &[Oid] {
64        self.inner.param_types()
65    }
66
67    /// Result-column schema, always available (captured at prepare time).
68    #[must_use]
69    pub fn schema(&self) -> &ResultSchema {
70        &self.schema
71    }
72
73    /// The original SQL text.
74    #[must_use]
75    pub fn sql(&self) -> &str {
76        self.inner.query()
77    }
78
79    /// Executes the statement and returns a streaming [`AsyncRowset`].
80    ///
81    /// # Errors
82    ///
83    /// - Returns [`Error::Other`] on gRPC transport.
84    /// - Returns [`Error::Client`] if the server rejects `Bind` or
85    ///   `Execute`.
86    /// - Returns [`Error::Io`] on transport-level I/O failures.
87    pub async fn query(&self, params: &[&dyn ToSqlParam]) -> Result<AsyncRowset<'conn>> {
88        let encoded = encode_params(params);
89        let client = async_tcp_client(self.connection)?;
90        let stream = client
91            .execute_prepared_streaming(
92                &self.inner,
93                encoded,
94                crate::result::DEFAULT_BINARY_CHUNK_SIZE,
95            )
96            .await?;
97        Ok(AsyncRowset::from_prepared(stream))
98    }
99
100    /// Executes the statement as a command and returns the affected-row
101    /// count (async).
102    ///
103    /// # Errors
104    ///
105    /// - Returns [`Error::Other`] on gRPC transport.
106    /// - Returns [`Error::Client`] if the server rejects `Bind` or
107    ///   `Execute`.
108    /// - Returns [`Error::Io`] on transport-level I/O failures.
109    pub async fn execute(&self, params: &[&dyn ToSqlParam]) -> Result<u64> {
110        let encoded = encode_params(params);
111        let client = async_tcp_client(self.connection)?;
112        Ok(client
113            .execute_prepared_no_result(&self.inner, encoded)
114            .await?)
115    }
116
117    /// Fetches exactly one row; errors if the result is empty.
118    ///
119    /// # Errors
120    ///
121    /// - Returns the error from [`query`](Self::query).
122    /// - Returns [`Error::Other`] with message `"Query returned no rows"`
123    ///   if the result is empty.
124    pub async fn fetch_one(&self, params: &[&dyn ToSqlParam]) -> Result<Row> {
125        self.query(params).await?.require_first_row().await
126    }
127
128    /// Fetches at most one row; returns `None` if the result is empty.
129    ///
130    /// # Errors
131    ///
132    /// Returns the error from [`query`](Self::query); an empty result
133    /// yields `Ok(None)`.
134    pub async fn fetch_optional(&self, params: &[&dyn ToSqlParam]) -> Result<Option<Row>> {
135        self.query(params).await?.first_row().await
136    }
137
138    /// Fetches every row into a `Vec`.
139    ///
140    /// # Errors
141    ///
142    /// Returns the error from [`query`](Self::query), or a transport
143    /// error produced while draining every chunk.
144    pub async fn fetch_all(&self, params: &[&dyn ToSqlParam]) -> Result<Vec<Row>> {
145        self.query(params).await?.collect_rows().await
146    }
147
148    /// Fetches a single non-NULL scalar; errors on empty / NULL.
149    ///
150    /// # Errors
151    ///
152    /// - Returns the error from [`query`](Self::query).
153    /// - Returns [`Error::Other`] with message `"Query returned no rows"`
154    ///   if the result is empty.
155    /// - Returns [`Error::Other`] with message `"Scalar query returned NULL"`
156    ///   if the first cell is SQL `NULL`.
157    pub async fn fetch_scalar<T: RowValue>(&self, params: &[&dyn ToSqlParam]) -> Result<T> {
158        self.query(params).await?.require_scalar().await
159    }
160
161    /// Fetches a single scalar, allowing NULL as `None`.
162    ///
163    /// # Errors
164    ///
165    /// Returns the error from [`query`](Self::query); SQL `NULL` yields
166    /// `Ok(None)`.
167    pub async fn fetch_optional_scalar<T: RowValue>(
168        &self,
169        params: &[&dyn ToSqlParam],
170    ) -> Result<Option<T>> {
171        self.query(params).await?.scalar().await
172    }
173}
174
175pub(crate) fn encode_params(params: &[&dyn ToSqlParam]) -> Vec<Option<Vec<u8>>> {
176    params.iter().map(|p| p.encode_param()).collect()
177}
178
179// =============================================================================
180// AsyncPreparedStatementOwned — lifetime-free variant
181// =============================================================================
182
183/// Owned-handle variant of [`AsyncPreparedStatement`] that holds an
184/// `Arc<AsyncConnection>` instead of a borrow.
185///
186/// Semantics are identical to [`AsyncPreparedStatement`]. The only
187/// difference is that this variant is `'static` and can therefore live
188/// in structs that can't carry lifetimes — N-API classes, `tokio::spawn`
189/// tasks that outlive the constructor, etc.
190#[derive(Debug)]
191pub struct AsyncPreparedStatementOwned {
192    connection: Arc<AsyncConnection>,
193    inner: LowLevelAsyncPreparedStatement,
194    schema: Arc<ResultSchema>,
195}
196
197impl AsyncPreparedStatementOwned {
198    #[expect(
199        clippy::unnecessary_wraps,
200        reason = "signature retained for API symmetry / future fallibility; returning Result/Option keeps callers from breaking when the function later grows failure cases"
201    )]
202    pub(crate) fn new(
203        connection: Arc<AsyncConnection>,
204        inner: LowLevelAsyncPreparedStatement,
205    ) -> Result<Self> {
206        let schema = build_schema_from_columns(inner.columns());
207        Ok(Self {
208            connection,
209            inner,
210            schema: Arc::new(schema),
211        })
212    }
213
214    /// Number of parameters the statement expects.
215    #[must_use]
216    pub fn param_count(&self) -> usize {
217        self.inner.param_count()
218    }
219
220    /// Parameter type OIDs.
221    #[must_use]
222    pub fn param_types(&self) -> &[Oid] {
223        self.inner.param_types()
224    }
225
226    /// Result-column schema, captured at prepare time.
227    #[must_use]
228    pub fn schema(&self) -> &ResultSchema {
229        &self.schema
230    }
231
232    /// Original SQL text.
233    #[must_use]
234    pub fn sql(&self) -> &str {
235        self.inner.query()
236    }
237
238    /// Executes the statement and returns a materialized `Vec<Row>`.
239    ///
240    /// Unlike [`AsyncPreparedStatement::query`], the owned variant
241    /// returns an owned `Vec<Row>` rather than a streaming
242    /// [`AsyncRowset`]: `AsyncRowset` is itself lifetime-bound to
243    /// the connection's mutex guard, which defeats the purpose of the
244    /// owned wrapper. N-API callers that want streaming should fall
245    /// back to the non-owned `AsyncPreparedStatement` via
246    /// [`AsyncConnection::prepare`] or use the non-streaming query
247    /// methods below.
248    ///
249    /// # Errors
250    ///
251    /// - Returns [`Error::Other`] on gRPC transport.
252    /// - Returns [`Error::Client`] if the server rejects `Bind` or
253    ///   `Execute`, or raises a runtime error while streaming.
254    /// - Returns [`Error::Io`] on transport-level I/O failures.
255    pub async fn fetch_all(&self, params: &[&dyn ToSqlParam]) -> Result<Vec<Row>> {
256        let encoded = encode_params(params);
257        let client = async_tcp_client_arc(&self.connection)?;
258        let stream = client
259            .execute_prepared_streaming(
260                &self.inner,
261                encoded,
262                crate::result::DEFAULT_BINARY_CHUNK_SIZE,
263            )
264            .await?;
265        let rowset = AsyncRowset::from_prepared(stream);
266        rowset.collect_rows().await
267    }
268
269    /// Executes the statement as a command; returns the affected-row count.
270    ///
271    /// # Errors
272    ///
273    /// - Returns [`Error::Other`] on gRPC transport.
274    /// - Returns [`Error::Client`] if the server rejects `Bind` or
275    ///   `Execute`.
276    /// - Returns [`Error::Io`] on transport-level I/O failures.
277    pub async fn execute(&self, params: &[&dyn ToSqlParam]) -> Result<u64> {
278        let encoded = encode_params(params);
279        let client = async_tcp_client_arc(&self.connection)?;
280        Ok(client
281            .execute_prepared_no_result(&self.inner, encoded)
282            .await?)
283    }
284
285    /// Fetches exactly one row; errors on empty.
286    ///
287    /// # Errors
288    ///
289    /// - Returns the error from [`fetch_all`](Self::fetch_all).
290    /// - Returns [`Error::Other`] with message `"Query returned no rows"`
291    ///   if the result is empty.
292    pub async fn fetch_one(&self, params: &[&dyn ToSqlParam]) -> Result<Row> {
293        self.fetch_all(params)
294            .await?
295            .into_iter()
296            .next()
297            .ok_or_else(|| crate::error::Error::new("Query returned no rows"))
298    }
299
300    /// Fetches at most one row; `None` on empty.
301    ///
302    /// # Errors
303    ///
304    /// Returns the error from [`fetch_all`](Self::fetch_all); an empty
305    /// result yields `Ok(None)`.
306    pub async fn fetch_optional(&self, params: &[&dyn ToSqlParam]) -> Result<Option<Row>> {
307        Ok(self.fetch_all(params).await?.into_iter().next())
308    }
309
310    /// Fetches the first column of the first row as `T`.
311    ///
312    /// # Errors
313    ///
314    /// - Returns the error from [`fetch_one`](Self::fetch_one).
315    /// - Returns [`Error::Other`] with message `"Scalar query returned NULL"`
316    ///   if the first cell is SQL `NULL`.
317    pub async fn fetch_scalar<T: RowValue>(&self, params: &[&dyn ToSqlParam]) -> Result<T> {
318        let row = self.fetch_one(params).await?;
319        row.get::<T>(0)
320            .ok_or_else(|| crate::error::Error::new("Scalar query returned NULL"))
321    }
322
323    /// Fetches the first column of the first row as `Option<T>`.
324    ///
325    /// # Errors
326    ///
327    /// Returns the error from [`fetch_optional`](Self::fetch_optional);
328    /// SQL `NULL` yields `Ok(None)`.
329    pub async fn fetch_optional_scalar<T: RowValue>(
330        &self,
331        params: &[&dyn ToSqlParam],
332    ) -> Result<Option<T>> {
333        Ok(self
334            .fetch_optional(params)
335            .await?
336            .and_then(|r| r.get::<T>(0)))
337    }
338
339    /// Explicitly close the statement on the server.
340    ///
341    /// Equivalent to dropping the struct — the inner
342    /// `hyperdb_api_core::client::AsyncPreparedStatement` has its own Drop-time
343    /// best-effort close.
344    pub fn close(self) {
345        drop(self);
346    }
347}
348
349fn async_tcp_client_arc(
350    connection: &Arc<AsyncConnection>,
351) -> Result<&hyperdb_api_core::client::AsyncClient> {
352    match connection.transport() {
353        AsyncTransport::Tcp(tcp) => Ok(&tcp.client),
354        AsyncTransport::Grpc(_) => Err(Error::new(
355            "prepared statements are not supported over gRPC transport",
356        )),
357    }
358}
359
360pub(crate) fn async_tcp_client(
361    connection: &AsyncConnection,
362) -> Result<&hyperdb_api_core::client::AsyncClient> {
363    match connection.transport() {
364        AsyncTransport::Tcp(tcp) => Ok(&tcp.client),
365        AsyncTransport::Grpc(_) => Err(Error::new(
366            "prepared statements are not supported over gRPC transport",
367        )),
368    }
369}
370
371fn build_schema_from_columns(cols: &[hyperdb_api_core::client::Column]) -> ResultSchema {
372    let columns = cols
373        .iter()
374        .enumerate()
375        .map(|(idx, col)| {
376            let sql_type = hyperdb_api_core::types::SqlType::from_oid_and_modifier(
377                col.type_oid().0,
378                col.type_modifier(),
379            );
380            ResultColumn::new(col.name(), sql_type, idx)
381        })
382        .collect();
383    ResultSchema::from_columns(columns)
384}