hyperdb_api_core/client/
async_prepared_stream.rs1use std::sync::Arc;
10
11use crate::protocol::message::backend::Message;
12use tokio::sync::MutexGuard;
13use tracing::warn;
14
15use super::async_connection::AsyncRawConnection;
16use super::async_stream::AsyncStream;
17use super::cancel::Cancellable;
18use super::connection::parse_error_response;
19use super::error::Result;
20use super::row::StreamRow;
21use super::statement::Column;
22
23pub struct AsyncPreparedQueryStream<'a> {
32 conn: Option<MutexGuard<'a, AsyncRawConnection<AsyncStream>>>,
33 canceller: &'a dyn Cancellable,
34 finished: bool,
35 chunk_size: usize,
36 columns: Arc<Vec<Column>>,
38}
39
40impl std::fmt::Debug for AsyncPreparedQueryStream<'_> {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 f.debug_struct("AsyncPreparedQueryStream")
43 .field("finished", &self.finished)
44 .field("chunk_size", &self.chunk_size)
45 .field("column_count", &self.columns.len())
46 .finish_non_exhaustive()
47 }
48}
49
50impl<'a> AsyncPreparedQueryStream<'a> {
51 pub(crate) fn new(
52 conn: MutexGuard<'a, AsyncRawConnection<AsyncStream>>,
53 canceller: &'a dyn Cancellable,
54 chunk_size: usize,
55 columns: Arc<Vec<Column>>,
56 ) -> Self {
57 Self {
58 conn: Some(conn),
59 canceller,
60 finished: false,
61 chunk_size: chunk_size.max(1),
62 columns,
63 }
64 }
65
66 #[must_use]
69 pub fn schema(&self) -> &[Column] {
70 &self.columns
71 }
72
73 pub async fn next_chunk(&mut self) -> Result<Option<Vec<StreamRow>>> {
82 if self.finished {
83 return Ok(None);
84 }
85
86 let Some(conn) = self.conn.as_mut() else {
87 return Ok(None);
88 };
89
90 let mut rows = Vec::with_capacity(self.chunk_size);
91 while rows.len() < self.chunk_size {
92 let msg = conn.read_message().await?;
93 match msg {
94 Message::BindComplete => {}
95 Message::DataRow(data) => {
96 rows.push(StreamRow::new(data));
97 if rows.len() >= self.chunk_size {
98 return Ok(Some(rows));
99 }
100 }
101 Message::CommandComplete(_) | Message::EmptyQueryResponse => {}
102 Message::ReadyForQuery(_) => {
103 self.finished = true;
104 self.conn = None;
105 return if rows.is_empty() {
106 Ok(None)
107 } else {
108 Ok(Some(rows))
109 };
110 }
111 Message::ErrorResponse(body) => {
112 self.finished = true;
113 let err = match self.conn {
114 Some(ref mut c) => c.consume_error(&body).await,
115 None => parse_error_response(&body),
116 };
117 self.conn = None;
118 return Err(err);
119 }
120 _ => {}
121 }
122 }
123 Ok(Some(rows))
124 }
125}
126
127impl Drop for AsyncPreparedQueryStream<'_> {
128 fn drop(&mut self) {
129 if self.finished {
130 return;
131 }
132
133 self.canceller.cancel();
137
138 if let Some(ref mut conn) = self.conn {
139 conn.mark_desynchronized();
140 warn!(
141 target: "hyperdb_api_core::client",
142 "AsyncPreparedQueryStream dropped before completion; \
143 connection marked desynchronized — discard and reconnect",
144 );
145 }
146 }
147}