mssql_client/row_stream.rs
1//! True incremental row streaming: rows pulled from the socket on demand.
2//!
3//! Unlike [`QueryStream`](crate::QueryStream) — which buffers the entire
4//! response in memory and then decodes rows lazily — [`RowStream`] holds the
5//! connection and reads TDS packets only as rows are pulled. Peak memory is
6//! roughly one packet plus one partial row, independent of result-set size, so
7//! a multi-million-row `SELECT` does not have to fit in client memory.
8//!
9//! Returned by [`Client::query_stream`](crate::Client::query_stream). The
10//! stream borrows the client mutably for its lifetime, so no other operation
11//! can run on the connection until the stream is consumed or dropped — natural
12//! backpressure, and the type system enforces the exclusivity.
13//!
14//! ```no_run
15//! # use mssql_client::{Client, Ready, Row};
16//! # async fn ex(client: &mut Client<Ready>) -> Result<(), mssql_client::Error> {
17//! let mut stream = client.query_stream("SELECT id, name FROM big_table", &[]).await?;
18//! while let Some(row) = stream.try_next().await? {
19//! let id: i32 = row.get_by_name("id")?;
20//! let _ = id;
21//! }
22//! # Ok(())
23//! # }
24//! ```
25
26use std::sync::Arc;
27
28use tds_protocol::token::{ColMetaData, Token};
29
30use crate::Client;
31use crate::error::{Error, Result};
32use crate::row::{Column, Row};
33use crate::row_source::{Pull, RowSource};
34use crate::state::{ConnectionState, Ready};
35
36/// An incrementally streamed result set: rows are read from the network as they
37/// are pulled, not buffered up front.
38///
39/// See the [module docs](self) for how this differs from
40/// [`QueryStream`](crate::QueryStream). Obtain one from
41/// [`Client::query_stream`](crate::Client::query_stream).
42///
43/// Pulling is async (each row may require reading another packet), so this type
44/// is consumed via [`try_next`](Self::try_next) / [`collect_all`](Self::collect_all)
45/// rather than the synchronous [`Iterator`] that `QueryStream` offers.
46#[must_use = "streams must be consumed; dropping a stream discards remaining rows"]
47pub struct RowStream<'a, S: ConnectionState = Ready> {
48 /// The client whose connection supplies packets. Borrowed for the stream's
49 /// lifetime so no other request can run concurrently. Generic over the
50 /// connection state so the same stream serves both `Ready` and
51 /// `InTransaction` clients.
52 client: &'a mut Client<S>,
53 /// The incremental token decoder over the rolling packet buffer.
54 source: RowSource,
55 /// Columns for the current result set (rebuilt on each ColMetaData).
56 row_meta: Arc<crate::row::ColMetaData>,
57 /// Protocol metadata for decoding raw rows of the current result set.
58 meta: ColMetaData,
59 /// Pre-resolved column decryptor for the current Always Encrypted result set.
60 #[cfg(feature = "always-encrypted")]
61 decryptor: Option<std::sync::Arc<crate::column_decryptor::ColumnDecryptor>>,
62 /// Whether the stream has reached the end of the response.
63 finished: bool,
64}
65
66impl<'a, S: ConnectionState> RowStream<'a, S> {
67 /// Construct a stream positioned just after the first ColMetaData, ready to
68 /// yield the result set's rows. Called by `Client::query_stream`.
69 pub(crate) fn new(
70 client: &'a mut Client<S>,
71 source: RowSource,
72 columns: Vec<Column>,
73 meta: ColMetaData,
74 #[cfg(feature = "always-encrypted")] decryptor: Option<
75 std::sync::Arc<crate::column_decryptor::ColumnDecryptor>,
76 >,
77 ) -> Self {
78 Self {
79 client,
80 source,
81 row_meta: Arc::new(crate::row::ColMetaData::new(columns)),
82 meta,
83 #[cfg(feature = "always-encrypted")]
84 decryptor,
85 finished: false,
86 }
87 }
88
89 /// Construct an already-finished stream (the query produced no result set,
90 /// e.g. an `INSERT`). The caller has already cleared the in-flight flag.
91 pub(crate) fn empty(client: &'a mut Client<S>) -> Self {
92 Self {
93 client,
94 source: RowSource::new(false),
95 row_meta: Arc::new(crate::row::ColMetaData::new(Vec::new())),
96 meta: ColMetaData::default(),
97 #[cfg(feature = "always-encrypted")]
98 decryptor: None,
99 finished: true,
100 }
101 }
102
103 /// The columns of the current result set.
104 #[must_use]
105 pub fn columns(&self) -> &[Column] {
106 &self.row_meta.columns
107 }
108
109 /// Whether the stream has been fully consumed.
110 #[must_use]
111 pub fn is_finished(&self) -> bool {
112 self.finished
113 }
114
115 /// Pull the next row, reading more packets from the connection as needed.
116 ///
117 /// Returns `Ok(None)` once the response is fully drained — at which point
118 /// the connection is clean for the next request. A server error token in
119 /// the stream is surfaced here as [`Error::Server`].
120 pub async fn try_next(&mut self) -> Result<Option<Row>> {
121 if self.finished {
122 return Ok(None);
123 }
124
125 loop {
126 match self.source.pull()? {
127 Pull::Token(Token::Row(raw)) => return Ok(Some(self.decode_raw(&raw)?)),
128 Pull::Token(Token::NbcRow(nbc)) => return Ok(Some(self.decode_nbc(&nbc)?)),
129 Pull::Token(Token::ColMetaData(meta)) => {
130 // A new result set within the same response (multi-statement
131 // batch). Stream its rows flatly, continuing from here.
132 self.switch_result_set(meta).await?;
133 }
134 Pull::Token(Token::Error(err)) => {
135 self.finish();
136 return Err(crate::client::response::server_token_to_error(&err));
137 }
138 Pull::Token(Token::Done(done)) => {
139 if done.status.error {
140 self.finish();
141 return Err(Error::Query(
142 "query failed (server set error flag in DONE token)".to_string(),
143 ));
144 }
145 // Otherwise keep going: rows of another result set, or the
146 // final DONE followed by Pull::End, may still come.
147 }
148 Pull::Token(Token::EnvChange(env)) => {
149 // Keep the transaction descriptor in sync with raw
150 // BEGIN/COMMIT/ROLLBACK seen mid-stream, as the buffered
151 // readers do.
152 self.client.apply_transaction_env_change(&env);
153 }
154 Pull::Token(_) => {
155 // Info / Order / DoneProc / DoneInProc, etc.
156 // Not row data; keep pulling.
157 }
158 Pull::NeedMore => match self.client.read_response_packet().await? {
159 Some((payload, is_eom)) => self.source.push_packet(payload, is_eom),
160 None => {
161 self.finish();
162 return Err(Error::ConnectionClosed);
163 }
164 },
165 Pull::End => {
166 self.finish();
167 return Ok(None);
168 }
169 }
170 }
171 }
172
173 /// Drain the remaining rows into a vector.
174 ///
175 /// For large result sets prefer [`try_next`](Self::try_next) — this loads
176 /// every remaining row into memory at once.
177 pub async fn collect_all(mut self) -> Result<Vec<Row>> {
178 let mut out = Vec::new();
179 while let Some(row) = self.try_next().await? {
180 out.push(row);
181 }
182 Ok(out)
183 }
184
185 /// Stop the stream early and leave the connection reusable.
186 ///
187 /// Sends an Attention to the server and drains to its acknowledgement so the
188 /// connection is clean for the next request — the correct way to abandon a
189 /// large result set you no longer need.
190 ///
191 /// Calling this is optional: simply **dropping** a partially-read stream is
192 /// safe but leaves the connection marked in-flight, so a pooled connection
193 /// is discarded on return and a directly reused client recovers it (with an
194 /// Attention/drain) on its next request. `cancel` avoids that discard and
195 /// reports any error from the cancellation.
196 pub async fn cancel(mut self) -> Result<()> {
197 if self.finished {
198 return Ok(());
199 }
200 self.finished = true;
201 self.client.cancel_in_flight_response().await
202 }
203
204 /// Mark the stream finished and the connection clean for the next request.
205 fn finish(&mut self) {
206 self.finished = true;
207 self.client.note_response_drained();
208 }
209
210 /// Adopt a new result set's metadata mid-stream (multi-statement batch).
211 async fn switch_result_set(&mut self, meta: ColMetaData) -> Result<()> {
212 self.row_meta = Arc::new(crate::row::ColMetaData::new(Client::<S>::build_columns(
213 &meta,
214 )));
215 #[cfg(feature = "always-encrypted")]
216 {
217 self.decryptor = self
218 .client
219 .resolve_decryptor(&meta)
220 .await?
221 .map(std::sync::Arc::new);
222 }
223 self.meta = meta;
224 Ok(())
225 }
226
227 /// Decode a raw row against the current result set's metadata.
228 fn decode_raw(&self, raw: &tds_protocol::token::RawRow) -> Result<Row> {
229 #[cfg(feature = "always-encrypted")]
230 if let Some(ref dec) = self.decryptor {
231 return crate::column_parser::convert_raw_row_decrypted(
232 raw,
233 &self.meta,
234 &self.row_meta,
235 dec,
236 );
237 }
238 crate::column_parser::convert_raw_row(raw, &self.meta, &self.row_meta)
239 }
240
241 /// Decode a null-bitmap-compressed row against the current metadata.
242 fn decode_nbc(&self, nbc: &tds_protocol::token::NbcRow) -> Result<Row> {
243 #[cfg(feature = "always-encrypted")]
244 if let Some(ref dec) = self.decryptor {
245 return crate::column_parser::convert_nbc_row_decrypted(
246 nbc,
247 &self.meta,
248 &self.row_meta,
249 dec,
250 );
251 }
252 crate::column_parser::convert_nbc_row(nbc, &self.meta, &self.row_meta)
253 }
254}