hyperdb_api_core/client/prepared_stream.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Streaming prepared-statement execution.
5//!
6//! [`PreparedQueryStream`] is the analog of
7//! [`QueryStream`](super::client::QueryStream) for prepared statements: it
8//! drives the Bind/Execute/Sync response and yields rows in chunks so
9//! callers can process arbitrarily large result sets with constant memory.
10//!
11//! Unlike `QueryStream`, the schema is known at construction time — it was
12//! captured when the statement was prepared via Describe('S'), before any
13//! rows are fetched. This makes `schema()` available *before* the first
14//! `next_chunk` call.
15
16use std::sync::{Arc, MutexGuard};
17
18use crate::protocol::message::backend::Message;
19use tracing::warn;
20
21use super::cancel::Cancellable;
22use super::connection::{parse_error_response, RawConnection};
23use super::error::Result;
24use super::row::StreamRow;
25use super::statement::Column;
26use super::sync_stream::SyncStream;
27
28/// Bounded drain budget used by [`PreparedQueryStream::drop`] after
29/// sending a best-effort cancel. Matches the value inlined by
30/// [`QueryStream`](super::client::QueryStream) — see that type's `Drop`
31/// impl for the rationale.
32const POST_CANCEL_DRAIN_CAP: usize = 1024;
33
34/// Streaming iterator for prepared-statement results without materializing
35/// all rows.
36///
37/// Holding a `PreparedQueryStream` keeps the underlying connection locked
38/// via a `MutexGuard`. Dropping the stream before fully iterating triggers
39/// a server-side cancel (see [`Drop`] below) so the connection is returned
40/// to a usable state.
41pub struct PreparedQueryStream<'a> {
42 conn: Option<MutexGuard<'a, RawConnection<SyncStream>>>,
43 /// Best-effort cancel handle — see
44 /// [`QueryStream`](super::client::QueryStream) for the full rationale.
45 canceller: &'a dyn Cancellable,
46 finished: bool,
47 chunk_size: usize,
48 /// Column metadata carried through from the statement's Describe pass.
49 /// Shared with each [`StreamRow`](super::row::StreamRow) via `Arc` so
50 /// schema-dependent getters have cheap access.
51 columns: Arc<Vec<Column>>,
52}
53
54impl std::fmt::Debug for PreparedQueryStream<'_> {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 f.debug_struct("PreparedQueryStream")
57 .field("finished", &self.finished)
58 .field("chunk_size", &self.chunk_size)
59 .field("column_count", &self.columns.len())
60 .finish_non_exhaustive()
61 }
62}
63
64impl<'a> PreparedQueryStream<'a> {
65 /// Constructs a new streaming reader. The caller must have already
66 /// issued the Bind/Execute/Sync via
67 /// [`RawConnection::start_execute_prepared`].
68 pub(crate) fn new(
69 conn: MutexGuard<'a, RawConnection<SyncStream>>,
70 canceller: &'a dyn Cancellable,
71 chunk_size: usize,
72 columns: Arc<Vec<Column>>,
73 ) -> Self {
74 Self {
75 conn: Some(conn),
76 canceller,
77 finished: false,
78 chunk_size: chunk_size.max(1),
79 columns,
80 }
81 }
82
83 /// Returns the result schema (column metadata). Always available —
84 /// the columns were captured at prepare time.
85 #[must_use]
86 pub fn schema(&self) -> &[Column] {
87 &self.columns
88 }
89
90 /// Retrieves the next chunk of rows (up to `chunk_size`).
91 ///
92 /// Returns `Ok(Some(rows))` for each chunk, `Ok(None)` after the final
93 /// `ReadyForQuery`, and `Err(_)` if the server sent an `ErrorResponse`.
94 ///
95 /// # Errors
96 ///
97 /// - Returns `Error` (I/O) / `Error` (closed) if the transport
98 /// fails while awaiting the next protocol message.
99 /// - Returns `Error` (server) when the server sends an
100 /// `ErrorResponse` partway through the result stream.
101 pub fn next_chunk(&mut self) -> Result<Option<Vec<StreamRow>>> {
102 if self.finished {
103 return Ok(None);
104 }
105
106 let Some(conn) = self.conn.as_mut() else {
107 return Ok(None);
108 };
109
110 let mut rows = Vec::with_capacity(self.chunk_size);
111 while rows.len() < self.chunk_size {
112 let msg = conn.read_message()?;
113 match msg {
114 Message::BindComplete => {
115 // Bind succeeded — expected once at the start.
116 }
117 Message::DataRow(data) => {
118 rows.push(StreamRow::new(data));
119 if rows.len() >= self.chunk_size {
120 return Ok(Some(rows));
121 }
122 }
123 Message::CommandComplete(_) | Message::EmptyQueryResponse => {}
124 Message::ReadyForQuery(_) => {
125 self.finished = true;
126 self.conn = None;
127 return if rows.is_empty() {
128 Ok(None)
129 } else {
130 Ok(Some(rows))
131 };
132 }
133 Message::ErrorResponse(body) => {
134 self.finished = true;
135 let err = match self.conn {
136 Some(ref mut c) => c.consume_error(&body),
137 None => parse_error_response(&body),
138 };
139 self.conn = None;
140 return Err(err);
141 }
142 _ => {}
143 }
144 }
145 Ok(Some(rows))
146 }
147}
148
149impl Drop for PreparedQueryStream<'_> {
150 fn drop(&mut self) {
151 if self.finished {
152 return;
153 }
154
155 // Mirror QueryStream's drop: fire-and-forget cancel, then drain the
156 // trailing `ErrorResponse(QueryCanceled) + ReadyForQuery` on a
157 // bounded budget so the connection is returned to the pool cleanly.
158 self.canceller.cancel();
159
160 if let Some(ref mut conn) = self.conn {
161 let _ok = conn.drain_until_ready_bounded(POST_CANCEL_DRAIN_CAP);
162 }
163
164 // Belt-and-braces in case the drain exceeds its budget: the drain
165 // itself flips `desynchronized`, so a future call that sees the
166 // connection will short-circuit with a clear error.
167 if self.conn.as_ref().is_some_and(|c| !c.is_healthy()) {
168 warn!(
169 target: "hyperdb_api_core::client",
170 "PreparedQueryStream dropped before completion and drain exceeded budget; \
171 connection marked desynchronized — discard and reconnect",
172 );
173 }
174 }
175}