hyperdb_api_core/client/async_stream_query.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Streaming async query results.
5//!
6//! This module is the async mirror of the sync [`QueryStream`](super::client::QueryStream):
7//! it yields rows in chunks so callers can process arbitrarily large result
8//! sets with constant memory. The lifetime parameter ties the stream to the
9//! owning [`AsyncClient`](super::async_client::AsyncClient) — the connection
10//! mutex is held for the duration of iteration.
11
12use tokio::sync::MutexGuard;
13use tracing::warn;
14
15use crate::protocol::message::backend::Message;
16
17use super::async_connection::AsyncRawConnection;
18use super::async_stream::AsyncStream;
19use super::cancel::Cancellable;
20use super::connection::parse_error_response;
21use super::error::Result;
22use super::row::StreamRow;
23use super::statement::{Column, ColumnFormat};
24
25/// Streaming iterator for query results without materializing all rows (async).
26///
27/// Holding an `AsyncQueryStream` keeps the underlying
28/// [`AsyncRawConnection`] locked via a `MutexGuard`. Dropping the stream
29/// before fully iterating triggers a server-side cancel (see [`Drop`] below)
30/// so the connection is returned to a usable state.
31pub struct AsyncQueryStream<'a> {
32 conn: Option<MutexGuard<'a, AsyncRawConnection<AsyncStream>>>,
33 /// Best-effort cancel handle. For [`AsyncClient`](super::async_client::AsyncClient)
34 /// this is a thin wrapper that opens a synchronous TCP/UDS/Named-Pipe
35 /// connection and sends a `CancelRequest` — the same pattern used by the
36 /// sync [`Client`](super::client::Client). Using a sync cancel path
37 /// lets `Drop` issue cancels without needing a tokio runtime handle.
38 canceller: &'a dyn Cancellable,
39 finished: bool,
40 chunk_size: usize,
41 schema: Option<Vec<Column>>,
42 schema_read: bool,
43}
44
45impl std::fmt::Debug for AsyncQueryStream<'_> {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 f.debug_struct("AsyncQueryStream")
48 .field("finished", &self.finished)
49 .field("chunk_size", &self.chunk_size)
50 .field("schema_read", &self.schema_read)
51 .finish_non_exhaustive()
52 }
53}
54
55impl<'a> AsyncQueryStream<'a> {
56 /// Constructs a new streaming result reader. The caller is responsible
57 /// for having already issued the query bytes (see
58 /// [`AsyncRawConnection::start_query_binary`]).
59 pub(crate) fn new(
60 conn: MutexGuard<'a, AsyncRawConnection<AsyncStream>>,
61 canceller: &'a dyn Cancellable,
62 chunk_size: usize,
63 ) -> Self {
64 Self {
65 conn: Some(conn),
66 canceller,
67 finished: false,
68 chunk_size: chunk_size.max(1),
69 schema: None,
70 schema_read: false,
71 }
72 }
73
74 /// Returns the schema (column metadata) for the result set, once the
75 /// first `RowDescription` has been read.
76 #[must_use]
77 pub fn schema(&self) -> Option<&[Column]> {
78 self.schema.as_deref()
79 }
80
81 /// Retrieves the next chunk of rows (up to `chunk_size`).
82 ///
83 /// Returns `Ok(Some(rows))` for each chunk, `Ok(None)` after the final
84 /// `ReadyForQuery`, and `Err(_)` if the server sent an `ErrorResponse`.
85 ///
86 /// # Errors
87 ///
88 /// - Returns `Error` (I/O) / `Error` (closed) if the async
89 /// transport fails while awaiting the next protocol message.
90 /// - Returns `Error` (server) when the server sends an
91 /// `ErrorResponse` partway through the result stream.
92 pub async fn next_chunk(&mut self) -> Result<Option<Vec<StreamRow>>> {
93 if self.finished {
94 return Ok(None);
95 }
96
97 let Some(conn) = self.conn.as_mut() else {
98 return Ok(None);
99 };
100
101 let mut rows = Vec::with_capacity(self.chunk_size);
102 while rows.len() < self.chunk_size {
103 let msg = conn.read_message().await?;
104 match msg {
105 Message::RowDescription(desc) if !self.schema_read => {
106 let mut cols = Vec::new();
107 for f in desc.fields().filter_map(std::result::Result::ok) {
108 cols.push(Column::new(
109 f.name().to_string(),
110 f.type_oid(),
111 f.type_modifier(),
112 ColumnFormat::from_code(f.format()),
113 ));
114 }
115 self.schema = Some(cols);
116 self.schema_read = true;
117 }
118 Message::DataRow(data) => {
119 rows.push(StreamRow::new(data));
120 if rows.len() >= self.chunk_size {
121 return Ok(Some(rows));
122 }
123 }
124 Message::ReadyForQuery(_) => {
125 self.finished = true;
126 self.conn = None;
127 return if rows.is_empty() {
128 Ok(None)
129 } else {
130 Ok(Some(rows))
131 };
132 }
133 Message::ErrorResponse(body) => {
134 self.finished = true;
135 let err = match self.conn {
136 Some(ref mut c) => c.consume_error(&body).await,
137 None => parse_error_response(&body),
138 };
139 self.conn = None;
140 return Err(err);
141 }
142 _ => {}
143 }
144 }
145 Ok(Some(rows))
146 }
147}
148
149impl Drop for AsyncQueryStream<'_> {
150 fn drop(&mut self) {
151 // If the stream was fully consumed, the server already emitted
152 // `ReadyForQuery` and the connection is clean — nothing to do.
153 if self.finished {
154 return;
155 }
156
157 // Otherwise: fire a best-effort cancel so the server stops producing
158 // rows for a query we no longer care about. `Cancellable::cancel` is
159 // implemented synchronously (short-lived fresh TCP/UDS connection),
160 // so it is safe to call from `Drop` without a runtime handle.
161 self.canceller.cancel();
162
163 // We cannot drain the trailing `ErrorResponse(QueryCanceled) +
164 // ReadyForQuery` here because draining requires `await`. Without the
165 // drain the connection is left desynchronized. Mark it as such so
166 // the next operation short-circuits with a clear error rather than
167 // hanging or misinterpreting trailing messages.
168 //
169 // This is a deliberate trade-off: async `Drop` is constrained, and
170 // spawning a drain task (which would need `tokio::runtime::Handle`)
171 // is fragile when the handle is unavailable. Marking desynchronized
172 // ensures the connection is discarded cleanly on next use.
173 if let Some(ref mut conn) = self.conn {
174 conn.mark_desynchronized();
175 warn!(
176 target: "hyperdb_api_core::client",
177 "AsyncQueryStream dropped before completion; \
178 connection marked desynchronized — discard and reconnect",
179 );
180 }
181 }
182}