Skip to main content

hyperdb_api_core/client/
prepared_stream.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Streaming prepared-statement execution.
5//!
6//! [`PreparedQueryStream`] is the analog of
7//! [`QueryStream`](super::client::QueryStream) for prepared statements: it
8//! drives the Bind/Execute/Sync response and yields rows in chunks so
9//! callers can process arbitrarily large result sets with constant memory.
10//!
11//! Unlike `QueryStream`, the schema is known at construction time — it was
12//! captured when the statement was prepared via Describe('S'), before any
13//! rows are fetched. This makes `schema()` available *before* the first
14//! `next_chunk` call.
15
16use std::sync::{Arc, MutexGuard};
17
18use crate::protocol::message::backend::Message;
19use tracing::warn;
20
21use super::cancel::Cancellable;
22use super::connection::{parse_error_response, RawConnection};
23use super::error::Result;
24use super::row::StreamRow;
25use super::statement::Column;
26use super::sync_stream::SyncStream;
27
28/// Bounded drain budget used by [`PreparedQueryStream::drop`] after
29/// sending a best-effort cancel. Matches the value inlined by
30/// [`QueryStream`](super::client::QueryStream) — see that type's `Drop`
31/// impl for the rationale.
32const POST_CANCEL_DRAIN_CAP: usize = 1024;
33
34/// Streaming iterator for prepared-statement results without materializing
35/// all rows.
36///
37/// Holding a `PreparedQueryStream` keeps the underlying connection locked
38/// via a `MutexGuard`. Dropping the stream before fully iterating triggers
39/// a server-side cancel (see [`Drop`] below) so the connection is returned
40/// to a usable state.
41pub struct PreparedQueryStream<'a> {
42    conn: Option<MutexGuard<'a, RawConnection<SyncStream>>>,
43    /// Best-effort cancel handle — see
44    /// [`QueryStream`](super::client::QueryStream) for the full rationale.
45    canceller: &'a dyn Cancellable,
46    finished: bool,
47    chunk_size: usize,
48    /// Column metadata carried through from the statement's Describe pass.
49    /// Shared with each [`StreamRow`](super::row::StreamRow) via `Arc` so
50    /// schema-dependent getters have cheap access.
51    columns: Arc<Vec<Column>>,
52}
53
54impl std::fmt::Debug for PreparedQueryStream<'_> {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct("PreparedQueryStream")
57            .field("finished", &self.finished)
58            .field("chunk_size", &self.chunk_size)
59            .field("column_count", &self.columns.len())
60            .finish_non_exhaustive()
61    }
62}
63
64impl<'a> PreparedQueryStream<'a> {
65    /// Constructs a new streaming reader. The caller must have already
66    /// issued the Bind/Execute/Sync via
67    /// [`RawConnection::start_execute_prepared`].
68    pub(crate) fn new(
69        conn: MutexGuard<'a, RawConnection<SyncStream>>,
70        canceller: &'a dyn Cancellable,
71        chunk_size: usize,
72        columns: Arc<Vec<Column>>,
73    ) -> Self {
74        Self {
75            conn: Some(conn),
76            canceller,
77            finished: false,
78            chunk_size: chunk_size.max(1),
79            columns,
80        }
81    }
82
83    /// Returns the result schema (column metadata). Always available —
84    /// the columns were captured at prepare time.
85    #[must_use]
86    pub fn schema(&self) -> &[Column] {
87        &self.columns
88    }
89
90    /// Retrieves the next chunk of rows (up to `chunk_size`).
91    ///
92    /// Returns `Ok(Some(rows))` for each chunk, `Ok(None)` after the final
93    /// `ReadyForQuery`, and `Err(_)` if the server sent an `ErrorResponse`.
94    ///
95    /// # Errors
96    ///
97    /// - Returns `Error` (I/O) / `Error` (closed) if the transport
98    ///   fails while awaiting the next protocol message.
99    /// - Returns `Error` (server) when the server sends an
100    ///   `ErrorResponse` partway through the result stream.
101    pub fn next_chunk(&mut self) -> Result<Option<Vec<StreamRow>>> {
102        if self.finished {
103            return Ok(None);
104        }
105
106        let Some(conn) = self.conn.as_mut() else {
107            return Ok(None);
108        };
109
110        let mut rows = Vec::with_capacity(self.chunk_size);
111        while rows.len() < self.chunk_size {
112            let msg = conn.read_message()?;
113            match msg {
114                Message::BindComplete => {
115                    // Bind succeeded — expected once at the start.
116                }
117                Message::DataRow(data) => {
118                    rows.push(StreamRow::new(data));
119                    if rows.len() >= self.chunk_size {
120                        return Ok(Some(rows));
121                    }
122                }
123                Message::CommandComplete(_) | Message::EmptyQueryResponse => {}
124                Message::ReadyForQuery(_) => {
125                    self.finished = true;
126                    self.conn = None;
127                    return if rows.is_empty() {
128                        Ok(None)
129                    } else {
130                        Ok(Some(rows))
131                    };
132                }
133                Message::ErrorResponse(body) => {
134                    self.finished = true;
135                    let err = match self.conn {
136                        Some(ref mut c) => c.consume_error(&body),
137                        None => parse_error_response(&body),
138                    };
139                    self.conn = None;
140                    return Err(err);
141                }
142                _ => {}
143            }
144        }
145        Ok(Some(rows))
146    }
147}
148
149impl Drop for PreparedQueryStream<'_> {
150    fn drop(&mut self) {
151        if self.finished {
152            return;
153        }
154
155        // Mirror QueryStream's drop: fire-and-forget cancel, then drain the
156        // trailing `ErrorResponse(QueryCanceled) + ReadyForQuery` on a
157        // bounded budget so the connection is returned to the pool cleanly.
158        self.canceller.cancel();
159
160        if let Some(ref mut conn) = self.conn {
161            let _ok = conn.drain_until_ready_bounded(POST_CANCEL_DRAIN_CAP);
162        }
163
164        // Belt-and-braces in case the drain exceeds its budget: the drain
165        // itself flips `desynchronized`, so a future call that sees the
166        // connection will short-circuit with a clear error.
167        if self.conn.as_ref().is_some_and(|c| !c.is_healthy()) {
168            warn!(
169                target: "hyperdb_api_core::client",
170                "PreparedQueryStream dropped before completion and drain exceeded budget; \
171                 connection marked desynchronized — discard and reconnect",
172            );
173        }
174    }
175}