mssql_client/row_stream.rs
1//! True incremental row streaming: rows pulled from the socket on demand.
2//!
3//! Unlike [`QueryStream`](crate::QueryStream) — which buffers the entire
4//! response in memory and then decodes rows lazily — [`RowStream`] holds the
5//! connection and reads TDS packets only as rows are pulled. Peak memory is
6//! roughly one packet plus one partial row, independent of result-set size, so
7//! a multi-million-row `SELECT` does not have to fit in client memory.
8//!
9//! Returned by [`Client::query_stream`](crate::Client::query_stream). The
10//! stream borrows the client mutably for its lifetime, so no other operation
11//! can run on the connection until the stream is consumed or dropped — natural
12//! backpressure, and the type system enforces the exclusivity.
13//!
14//! ```no_run
15//! # use mssql_client::{Client, Ready, Row};
16//! # async fn ex(client: &mut Client<Ready>) -> Result<(), mssql_client::Error> {
17//! let mut stream = client.query_stream("SELECT id, name FROM big_table", &[]).await?;
18//! while let Some(row) = stream.try_next().await? {
19//! let id: i32 = row.get_by_name("id")?;
20//! let _ = id;
21//! }
22//! # Ok(())
23//! # }
24//! ```
25
26use tds_protocol::token::{ColMetaData, Token};
27
28use crate::Client;
29use crate::error::{Error, Result};
30use crate::row::{Column, Row};
31use crate::row_source::{Pull, RowSource};
32use crate::state::{ConnectionState, Ready};
33
34/// An incrementally streamed result set: rows are read from the network as they
35/// are pulled, not buffered up front.
36///
37/// See the [module docs](self) for how this differs from
38/// [`QueryStream`](crate::QueryStream). Obtain one from
39/// [`Client::query_stream`](crate::Client::query_stream).
40///
41/// Pulling is async (each row may require reading another packet), so this type
42/// is consumed via [`try_next`](Self::try_next) / [`collect_all`](Self::collect_all)
43/// rather than the synchronous [`Iterator`] that `QueryStream` offers.
44#[must_use = "streams must be consumed; dropping a stream discards remaining rows"]
45pub struct RowStream<'a, S: ConnectionState = Ready> {
46 /// The client whose connection supplies packets. Borrowed for the stream's
47 /// lifetime so no other request can run concurrently. Generic over the
48 /// connection state so the same stream serves both `Ready` and
49 /// `InTransaction` clients.
50 client: &'a mut Client<S>,
51 /// The incremental token decoder over the rolling packet buffer.
52 source: RowSource,
53 /// Columns for the current result set (rebuilt on each ColMetaData).
54 columns: Vec<Column>,
55 /// Protocol metadata for decoding raw rows of the current result set.
56 meta: ColMetaData,
57 /// Pre-resolved column decryptor for the current Always Encrypted result set.
58 #[cfg(feature = "always-encrypted")]
59 decryptor: Option<std::sync::Arc<crate::column_decryptor::ColumnDecryptor>>,
60 /// Whether the stream has reached the end of the response.
61 finished: bool,
62}
63
64impl<'a, S: ConnectionState> RowStream<'a, S> {
65 /// Construct a stream positioned just after the first ColMetaData, ready to
66 /// yield the result set's rows. Called by `Client::query_stream`.
67 pub(crate) fn new(
68 client: &'a mut Client<S>,
69 source: RowSource,
70 columns: Vec<Column>,
71 meta: ColMetaData,
72 #[cfg(feature = "always-encrypted")] decryptor: Option<
73 std::sync::Arc<crate::column_decryptor::ColumnDecryptor>,
74 >,
75 ) -> Self {
76 Self {
77 client,
78 source,
79 columns,
80 meta,
81 #[cfg(feature = "always-encrypted")]
82 decryptor,
83 finished: false,
84 }
85 }
86
87 /// Construct an already-finished stream (the query produced no result set,
88 /// e.g. an `INSERT`). The caller has already cleared the in-flight flag.
89 pub(crate) fn empty(client: &'a mut Client<S>) -> Self {
90 Self {
91 client,
92 source: RowSource::new(false),
93 columns: Vec::new(),
94 meta: ColMetaData::default(),
95 #[cfg(feature = "always-encrypted")]
96 decryptor: None,
97 finished: true,
98 }
99 }
100
101 /// The columns of the current result set.
102 #[must_use]
103 pub fn columns(&self) -> &[Column] {
104 &self.columns
105 }
106
107 /// Whether the stream has been fully consumed.
108 #[must_use]
109 pub fn is_finished(&self) -> bool {
110 self.finished
111 }
112
113 /// Pull the next row, reading more packets from the connection as needed.
114 ///
115 /// Returns `Ok(None)` once the response is fully drained — at which point
116 /// the connection is clean for the next request. A server error token in
117 /// the stream is surfaced here as [`Error::Server`].
118 pub async fn try_next(&mut self) -> Result<Option<Row>> {
119 if self.finished {
120 return Ok(None);
121 }
122
123 loop {
124 match self.source.pull()? {
125 Pull::Token(Token::Row(raw)) => return Ok(Some(self.decode_raw(&raw)?)),
126 Pull::Token(Token::NbcRow(nbc)) => return Ok(Some(self.decode_nbc(&nbc)?)),
127 Pull::Token(Token::ColMetaData(meta)) => {
128 // A new result set within the same response (multi-statement
129 // batch). Stream its rows flatly, continuing from here.
130 self.switch_result_set(meta).await?;
131 }
132 Pull::Token(Token::Error(err)) => {
133 self.finish();
134 return Err(crate::client::response::server_token_to_error(&err));
135 }
136 Pull::Token(Token::Done(done)) => {
137 if done.status.error {
138 self.finish();
139 return Err(Error::Query(
140 "query failed (server set error flag in DONE token)".to_string(),
141 ));
142 }
143 // Otherwise keep going: rows of another result set, or the
144 // final DONE followed by Pull::End, may still come.
145 }
146 Pull::Token(Token::EnvChange(env)) => {
147 // Keep the transaction descriptor in sync with raw
148 // BEGIN/COMMIT/ROLLBACK seen mid-stream, as the buffered
149 // readers do.
150 self.client.apply_transaction_env_change(&env);
151 }
152 Pull::Token(_) => {
153 // Info / Order / DoneProc / DoneInProc, etc.
154 // Not row data; keep pulling.
155 }
156 Pull::NeedMore => match self.client.read_response_packet().await? {
157 Some((payload, is_eom)) => self.source.push_packet(payload, is_eom),
158 None => {
159 self.finish();
160 return Err(Error::ConnectionClosed);
161 }
162 },
163 Pull::End => {
164 self.finish();
165 return Ok(None);
166 }
167 }
168 }
169 }
170
171 /// Drain the remaining rows into a vector.
172 ///
173 /// For large result sets prefer [`try_next`](Self::try_next) — this loads
174 /// every remaining row into memory at once.
175 pub async fn collect_all(mut self) -> Result<Vec<Row>> {
176 let mut out = Vec::new();
177 while let Some(row) = self.try_next().await? {
178 out.push(row);
179 }
180 Ok(out)
181 }
182
183 /// Stop the stream early and leave the connection reusable.
184 ///
185 /// Sends an Attention to the server and drains to its acknowledgement so the
186 /// connection is clean for the next request — the correct way to abandon a
187 /// large result set you no longer need.
188 ///
189 /// Calling this is optional: simply **dropping** a partially-read stream is
190 /// safe but leaves the connection marked in-flight, so a pooled connection
191 /// is discarded on return and a directly reused client recovers it (with an
192 /// Attention/drain) on its next request. `cancel` avoids that discard and
193 /// reports any error from the cancellation.
194 pub async fn cancel(mut self) -> Result<()> {
195 if self.finished {
196 return Ok(());
197 }
198 self.finished = true;
199 self.client.cancel_in_flight_response().await
200 }
201
202 /// Mark the stream finished and the connection clean for the next request.
203 fn finish(&mut self) {
204 self.finished = true;
205 self.client.note_response_drained();
206 }
207
208 /// Adopt a new result set's metadata mid-stream (multi-statement batch).
209 async fn switch_result_set(&mut self, meta: ColMetaData) -> Result<()> {
210 self.columns = Client::<S>::build_columns(&meta);
211 #[cfg(feature = "always-encrypted")]
212 {
213 self.decryptor = self
214 .client
215 .resolve_decryptor(&meta)
216 .await?
217 .map(std::sync::Arc::new);
218 }
219 self.meta = meta;
220 Ok(())
221 }
222
223 /// Decode a raw row against the current result set's metadata.
224 fn decode_raw(&self, raw: &tds_protocol::token::RawRow) -> Result<Row> {
225 #[cfg(feature = "always-encrypted")]
226 if let Some(ref dec) = self.decryptor {
227 return crate::column_parser::convert_raw_row_decrypted(
228 raw,
229 &self.meta,
230 &self.columns,
231 dec,
232 );
233 }
234 crate::column_parser::convert_raw_row(raw, &self.meta, &self.columns)
235 }
236
237 /// Decode a null-bitmap-compressed row against the current metadata.
238 fn decode_nbc(&self, nbc: &tds_protocol::token::NbcRow) -> Result<Row> {
239 #[cfg(feature = "always-encrypted")]
240 if let Some(ref dec) = self.decryptor {
241 return crate::column_parser::convert_nbc_row_decrypted(
242 nbc,
243 &self.meta,
244 &self.columns,
245 dec,
246 );
247 }
248 crate::column_parser::convert_nbc_row(nbc, &self.meta, &self.columns)
249 }
250}