Skip to main content

hyperdb_api_core/client/
async_prepared_stream.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Streaming async prepared-statement execution.
5//!
6//! Async mirror of [`PreparedQueryStream`](super::prepared_stream::PreparedQueryStream).
7//! See that module's docs for the design rationale.
8
9use std::sync::Arc;
10
11use crate::protocol::message::backend::Message;
12use tokio::sync::MutexGuard;
13use tracing::warn;
14
15use super::async_connection::AsyncRawConnection;
16use super::async_stream::AsyncStream;
17use super::cancel::Cancellable;
18use super::connection::parse_error_response;
19use super::error::Result;
20use super::row::StreamRow;
21use super::statement::Column;
22
23/// Streaming iterator for prepared-statement results without materializing
24/// all rows (async).
25///
26/// Holding an `AsyncPreparedQueryStream` keeps the underlying connection
27/// locked via a `MutexGuard`. Dropping the stream before fully iterating
28/// triggers a best-effort server-side cancel and marks the connection
29/// desynchronized — same Drop semantics as
30/// [`AsyncQueryStream`](super::async_stream_query::AsyncQueryStream).
31pub struct AsyncPreparedQueryStream<'a> {
32    conn: Option<MutexGuard<'a, AsyncRawConnection<AsyncStream>>>,
33    canceller: &'a dyn Cancellable,
34    finished: bool,
35    chunk_size: usize,
36    /// Columns captured at prepare time.
37    columns: Arc<Vec<Column>>,
38}
39
40impl std::fmt::Debug for AsyncPreparedQueryStream<'_> {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        f.debug_struct("AsyncPreparedQueryStream")
43            .field("finished", &self.finished)
44            .field("chunk_size", &self.chunk_size)
45            .field("column_count", &self.columns.len())
46            .finish_non_exhaustive()
47    }
48}
49
50impl<'a> AsyncPreparedQueryStream<'a> {
51    pub(crate) fn new(
52        conn: MutexGuard<'a, AsyncRawConnection<AsyncStream>>,
53        canceller: &'a dyn Cancellable,
54        chunk_size: usize,
55        columns: Arc<Vec<Column>>,
56    ) -> Self {
57        Self {
58            conn: Some(conn),
59            canceller,
60            finished: false,
61            chunk_size: chunk_size.max(1),
62            columns,
63        }
64    }
65
66    /// Returns the result schema (column metadata). Always available —
67    /// the columns were captured at prepare time.
68    #[must_use]
69    pub fn schema(&self) -> &[Column] {
70        &self.columns
71    }
72
73    /// Retrieves the next chunk of rows (up to `chunk_size`).
74    ///
75    /// # Errors
76    ///
77    /// - Returns `Error` (I/O) / `Error` (closed) if the async
78    ///   transport fails while awaiting the next protocol message.
79    /// - Returns `Error` (server) when the server sends an
80    ///   `ErrorResponse` partway through the result stream.
81    pub async fn next_chunk(&mut self) -> Result<Option<Vec<StreamRow>>> {
82        if self.finished {
83            return Ok(None);
84        }
85
86        let Some(conn) = self.conn.as_mut() else {
87            return Ok(None);
88        };
89
90        let mut rows = Vec::with_capacity(self.chunk_size);
91        while rows.len() < self.chunk_size {
92            let msg = conn.read_message().await?;
93            match msg {
94                Message::BindComplete => {}
95                Message::DataRow(data) => {
96                    rows.push(StreamRow::new(data));
97                    if rows.len() >= self.chunk_size {
98                        return Ok(Some(rows));
99                    }
100                }
101                Message::CommandComplete(_) | Message::EmptyQueryResponse => {}
102                Message::ReadyForQuery(_) => {
103                    self.finished = true;
104                    self.conn = None;
105                    return if rows.is_empty() {
106                        Ok(None)
107                    } else {
108                        Ok(Some(rows))
109                    };
110                }
111                Message::ErrorResponse(body) => {
112                    self.finished = true;
113                    let err = match self.conn {
114                        Some(ref mut c) => c.consume_error(&body).await,
115                        None => parse_error_response(&body),
116                    };
117                    self.conn = None;
118                    return Err(err);
119                }
120                _ => {}
121            }
122        }
123        Ok(Some(rows))
124    }
125}
126
127impl Drop for AsyncPreparedQueryStream<'_> {
128    fn drop(&mut self) {
129        if self.finished {
130            return;
131        }
132
133        // Drop can't await; mark the connection desynchronized and issue
134        // the sync CancelRequest on a fresh transport — same pattern as
135        // AsyncQueryStream::drop.
136        self.canceller.cancel();
137
138        if let Some(ref mut conn) = self.conn {
139            conn.mark_desynchronized();
140            warn!(
141                target: "hyperdb_api_core::client",
142                "AsyncPreparedQueryStream dropped before completion; \
143                 connection marked desynchronized — discard and reconnect",
144            );
145        }
146    }
147}