Skip to main content

hyperdb_api_core/client/
async_stream_query.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Streaming async query results.
5//!
6//! This module is the async mirror of the sync [`QueryStream`](super::client::QueryStream):
7//! it yields rows in chunks so callers can process arbitrarily large result
8//! sets with constant memory. The lifetime parameter ties the stream to the
9//! owning [`AsyncClient`](super::async_client::AsyncClient) — the connection
10//! mutex is held for the duration of iteration.
11
12use tokio::sync::MutexGuard;
13use tracing::warn;
14
15use crate::protocol::message::backend::Message;
16
17use super::async_connection::AsyncRawConnection;
18use super::async_stream::AsyncStream;
19use super::cancel::Cancellable;
20use super::connection::parse_error_response;
21use super::error::Result;
22use super::row::StreamRow;
23use super::statement::{Column, ColumnFormat};
24
25/// Streaming iterator for query results without materializing all rows (async).
26///
27/// Holding an `AsyncQueryStream` keeps the underlying
28/// [`AsyncRawConnection`] locked via a `MutexGuard`. Dropping the stream
29/// before fully iterating triggers a server-side cancel (see [`Drop`] below)
30/// so the connection is returned to a usable state.
31pub struct AsyncQueryStream<'a> {
32    conn: Option<MutexGuard<'a, AsyncRawConnection<AsyncStream>>>,
33    /// Best-effort cancel handle. For [`AsyncClient`](super::async_client::AsyncClient)
34    /// this is a thin wrapper that opens a synchronous TCP/UDS/Named-Pipe
35    /// connection and sends a `CancelRequest` — the same pattern used by the
36    /// sync [`Client`](super::client::Client). Using a sync cancel path
37    /// lets `Drop` issue cancels without needing a tokio runtime handle.
38    canceller: &'a dyn Cancellable,
39    finished: bool,
40    chunk_size: usize,
41    schema: Option<Vec<Column>>,
42    schema_read: bool,
43}
44
45impl std::fmt::Debug for AsyncQueryStream<'_> {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        f.debug_struct("AsyncQueryStream")
48            .field("finished", &self.finished)
49            .field("chunk_size", &self.chunk_size)
50            .field("schema_read", &self.schema_read)
51            .finish_non_exhaustive()
52    }
53}
54
55impl<'a> AsyncQueryStream<'a> {
56    /// Constructs a new streaming result reader. The caller is responsible
57    /// for having already issued the query bytes (see
58    /// [`AsyncRawConnection::start_query_binary`]).
59    pub(crate) fn new(
60        conn: MutexGuard<'a, AsyncRawConnection<AsyncStream>>,
61        canceller: &'a dyn Cancellable,
62        chunk_size: usize,
63    ) -> Self {
64        Self {
65            conn: Some(conn),
66            canceller,
67            finished: false,
68            chunk_size: chunk_size.max(1),
69            schema: None,
70            schema_read: false,
71        }
72    }
73
74    /// Returns the schema (column metadata) for the result set, once the
75    /// first `RowDescription` has been read.
76    #[must_use]
77    pub fn schema(&self) -> Option<&[Column]> {
78        self.schema.as_deref()
79    }
80
81    /// Retrieves the next chunk of rows (up to `chunk_size`).
82    ///
83    /// Returns `Ok(Some(rows))` for each chunk, `Ok(None)` after the final
84    /// `ReadyForQuery`, and `Err(_)` if the server sent an `ErrorResponse`.
85    ///
86    /// # Errors
87    ///
88    /// - Returns `Error` (I/O) / `Error` (closed) if the async
89    ///   transport fails while awaiting the next protocol message.
90    /// - Returns `Error` (server) when the server sends an
91    ///   `ErrorResponse` partway through the result stream.
92    pub async fn next_chunk(&mut self) -> Result<Option<Vec<StreamRow>>> {
93        if self.finished {
94            return Ok(None);
95        }
96
97        let Some(conn) = self.conn.as_mut() else {
98            return Ok(None);
99        };
100
101        let mut rows = Vec::with_capacity(self.chunk_size);
102        while rows.len() < self.chunk_size {
103            let msg = conn.read_message().await?;
104            match msg {
105                Message::RowDescription(desc) if !self.schema_read => {
106                    let mut cols = Vec::new();
107                    for f in desc.fields().filter_map(std::result::Result::ok) {
108                        cols.push(Column::new(
109                            f.name().to_string(),
110                            f.type_oid(),
111                            f.type_modifier(),
112                            ColumnFormat::from_code(f.format()),
113                        ));
114                    }
115                    self.schema = Some(cols);
116                    self.schema_read = true;
117                }
118                Message::DataRow(data) => {
119                    rows.push(StreamRow::new(data));
120                    if rows.len() >= self.chunk_size {
121                        return Ok(Some(rows));
122                    }
123                }
124                Message::ReadyForQuery(_) => {
125                    self.finished = true;
126                    self.conn = None;
127                    return if rows.is_empty() {
128                        Ok(None)
129                    } else {
130                        Ok(Some(rows))
131                    };
132                }
133                Message::ErrorResponse(body) => {
134                    self.finished = true;
135                    let err = match self.conn {
136                        Some(ref mut c) => c.consume_error(&body).await,
137                        None => parse_error_response(&body),
138                    };
139                    self.conn = None;
140                    return Err(err);
141                }
142                _ => {}
143            }
144        }
145        Ok(Some(rows))
146    }
147}
148
149impl Drop for AsyncQueryStream<'_> {
150    fn drop(&mut self) {
151        // If the stream was fully consumed, the server already emitted
152        // `ReadyForQuery` and the connection is clean — nothing to do.
153        if self.finished {
154            return;
155        }
156
157        // Otherwise: fire a best-effort cancel so the server stops producing
158        // rows for a query we no longer care about. `Cancellable::cancel` is
159        // implemented synchronously (short-lived fresh TCP/UDS connection),
160        // so it is safe to call from `Drop` without a runtime handle.
161        self.canceller.cancel();
162
163        // We cannot drain the trailing `ErrorResponse(QueryCanceled) +
164        // ReadyForQuery` here because draining requires `await`. Without the
165        // drain the connection is left desynchronized. Mark it as such so
166        // the next operation short-circuits with a clear error rather than
167        // hanging or misinterpreting trailing messages.
168        //
169        // This is a deliberate trade-off: async `Drop` is constrained, and
170        // spawning a drain task (which would need `tokio::runtime::Handle`)
171        // is fragile when the handle is unavailable. Marking desynchronized
172        // ensures the connection is discarded cleanly on next use.
173        if let Some(ref mut conn) = self.conn {
174            conn.mark_desynchronized();
175            warn!(
176                target: "hyperdb_api_core::client",
177                "AsyncQueryStream dropped before completion; \
178                 connection marked desynchronized — discard and reconnect",
179            );
180        }
181    }
182}