Skip to main content

mssql_client/
row_stream.rs

1//! True incremental row streaming: rows pulled from the socket on demand.
2//!
3//! Unlike [`QueryStream`](crate::QueryStream) — which buffers the entire
4//! response in memory and then decodes rows lazily — [`RowStream`] holds the
5//! connection and reads TDS packets only as rows are pulled. Peak memory is
6//! roughly one packet plus one partial row, independent of result-set size, so
7//! a multi-million-row `SELECT` does not have to fit in client memory.
8//!
9//! Returned by [`Client::query_stream`](crate::Client::query_stream). The
10//! stream borrows the client mutably for its lifetime, so no other operation
11//! can run on the connection until the stream is consumed or dropped — natural
12//! backpressure, and the type system enforces the exclusivity.
13//!
14//! ```no_run
15//! # use mssql_client::{Client, Ready, Row};
16//! # async fn ex(client: &mut Client<Ready>) -> Result<(), mssql_client::Error> {
17//! let mut stream = client.query_stream("SELECT id, name FROM big_table", &[]).await?;
18//! while let Some(row) = stream.try_next().await? {
19//!     let id: i32 = row.get_by_name("id")?;
20//!     let _ = id;
21//! }
22//! # Ok(())
23//! # }
24//! ```
25
26use tds_protocol::token::{ColMetaData, Token};
27
28use crate::Client;
29use crate::error::{Error, Result};
30use crate::row::{Column, Row};
31use crate::row_source::{Pull, RowSource};
32use crate::state::{ConnectionState, Ready};
33
34/// An incrementally streamed result set: rows are read from the network as they
35/// are pulled, not buffered up front.
36///
37/// See the [module docs](self) for how this differs from
38/// [`QueryStream`](crate::QueryStream). Obtain one from
39/// [`Client::query_stream`](crate::Client::query_stream).
40///
41/// Pulling is async (each row may require reading another packet), so this type
42/// is consumed via [`try_next`](Self::try_next) / [`collect_all`](Self::collect_all)
43/// rather than the synchronous [`Iterator`] that `QueryStream` offers.
44#[must_use = "streams must be consumed; dropping a stream discards remaining rows"]
45pub struct RowStream<'a, S: ConnectionState = Ready> {
46    /// The client whose connection supplies packets. Borrowed for the stream's
47    /// lifetime so no other request can run concurrently. Generic over the
48    /// connection state so the same stream serves both `Ready` and
49    /// `InTransaction` clients.
50    client: &'a mut Client<S>,
51    /// The incremental token decoder over the rolling packet buffer.
52    source: RowSource,
53    /// Columns for the current result set (rebuilt on each ColMetaData).
54    columns: Vec<Column>,
55    /// Protocol metadata for decoding raw rows of the current result set.
56    meta: ColMetaData,
57    /// Pre-resolved column decryptor for the current Always Encrypted result set.
58    #[cfg(feature = "always-encrypted")]
59    decryptor: Option<std::sync::Arc<crate::column_decryptor::ColumnDecryptor>>,
60    /// Whether the stream has reached the end of the response.
61    finished: bool,
62}
63
64impl<'a, S: ConnectionState> RowStream<'a, S> {
65    /// Construct a stream positioned just after the first ColMetaData, ready to
66    /// yield the result set's rows. Called by `Client::query_stream`.
67    pub(crate) fn new(
68        client: &'a mut Client<S>,
69        source: RowSource,
70        columns: Vec<Column>,
71        meta: ColMetaData,
72        #[cfg(feature = "always-encrypted")] decryptor: Option<
73            std::sync::Arc<crate::column_decryptor::ColumnDecryptor>,
74        >,
75    ) -> Self {
76        Self {
77            client,
78            source,
79            columns,
80            meta,
81            #[cfg(feature = "always-encrypted")]
82            decryptor,
83            finished: false,
84        }
85    }
86
87    /// Construct an already-finished stream (the query produced no result set,
88    /// e.g. an `INSERT`). The caller has already cleared the in-flight flag.
89    pub(crate) fn empty(client: &'a mut Client<S>) -> Self {
90        Self {
91            client,
92            source: RowSource::new(false),
93            columns: Vec::new(),
94            meta: ColMetaData::default(),
95            #[cfg(feature = "always-encrypted")]
96            decryptor: None,
97            finished: true,
98        }
99    }
100
101    /// The columns of the current result set.
102    #[must_use]
103    pub fn columns(&self) -> &[Column] {
104        &self.columns
105    }
106
107    /// Whether the stream has been fully consumed.
108    #[must_use]
109    pub fn is_finished(&self) -> bool {
110        self.finished
111    }
112
113    /// Pull the next row, reading more packets from the connection as needed.
114    ///
115    /// Returns `Ok(None)` once the response is fully drained — at which point
116    /// the connection is clean for the next request. A server error token in
117    /// the stream is surfaced here as [`Error::Server`].
118    pub async fn try_next(&mut self) -> Result<Option<Row>> {
119        if self.finished {
120            return Ok(None);
121        }
122
123        loop {
124            match self.source.pull()? {
125                Pull::Token(Token::Row(raw)) => return Ok(Some(self.decode_raw(&raw)?)),
126                Pull::Token(Token::NbcRow(nbc)) => return Ok(Some(self.decode_nbc(&nbc)?)),
127                Pull::Token(Token::ColMetaData(meta)) => {
128                    // A new result set within the same response (multi-statement
129                    // batch). Stream its rows flatly, continuing from here.
130                    self.switch_result_set(meta).await?;
131                }
132                Pull::Token(Token::Error(err)) => {
133                    self.finish();
134                    return Err(crate::client::response::server_token_to_error(&err));
135                }
136                Pull::Token(Token::Done(done)) => {
137                    if done.status.error {
138                        self.finish();
139                        return Err(Error::Query(
140                            "query failed (server set error flag in DONE token)".to_string(),
141                        ));
142                    }
143                    // Otherwise keep going: rows of another result set, or the
144                    // final DONE followed by Pull::End, may still come.
145                }
146                Pull::Token(Token::EnvChange(env)) => {
147                    // Keep the transaction descriptor in sync with raw
148                    // BEGIN/COMMIT/ROLLBACK seen mid-stream, as the buffered
149                    // readers do.
150                    self.client.apply_transaction_env_change(&env);
151                }
152                Pull::Token(_) => {
153                    // Info / Order / DoneProc / DoneInProc, etc.
154                    // Not row data; keep pulling.
155                }
156                Pull::NeedMore => match self.client.read_response_packet().await? {
157                    Some((payload, is_eom)) => self.source.push_packet(payload, is_eom),
158                    None => {
159                        self.finish();
160                        return Err(Error::ConnectionClosed);
161                    }
162                },
163                Pull::End => {
164                    self.finish();
165                    return Ok(None);
166                }
167            }
168        }
169    }
170
171    /// Drain the remaining rows into a vector.
172    ///
173    /// For large result sets prefer [`try_next`](Self::try_next) — this loads
174    /// every remaining row into memory at once.
175    pub async fn collect_all(mut self) -> Result<Vec<Row>> {
176        let mut out = Vec::new();
177        while let Some(row) = self.try_next().await? {
178            out.push(row);
179        }
180        Ok(out)
181    }
182
183    /// Stop the stream early and leave the connection reusable.
184    ///
185    /// Sends an Attention to the server and drains to its acknowledgement so the
186    /// connection is clean for the next request — the correct way to abandon a
187    /// large result set you no longer need.
188    ///
189    /// Calling this is optional: simply **dropping** a partially-read stream is
190    /// safe but leaves the connection marked in-flight, so a pooled connection
191    /// is discarded on return and a directly reused client recovers it (with an
192    /// Attention/drain) on its next request. `cancel` avoids that discard and
193    /// reports any error from the cancellation.
194    pub async fn cancel(mut self) -> Result<()> {
195        if self.finished {
196            return Ok(());
197        }
198        self.finished = true;
199        self.client.cancel_in_flight_response().await
200    }
201
202    /// Mark the stream finished and the connection clean for the next request.
203    fn finish(&mut self) {
204        self.finished = true;
205        self.client.note_response_drained();
206    }
207
208    /// Adopt a new result set's metadata mid-stream (multi-statement batch).
209    async fn switch_result_set(&mut self, meta: ColMetaData) -> Result<()> {
210        self.columns = Client::<S>::build_columns(&meta);
211        #[cfg(feature = "always-encrypted")]
212        {
213            self.decryptor = self
214                .client
215                .resolve_decryptor(&meta)
216                .await?
217                .map(std::sync::Arc::new);
218        }
219        self.meta = meta;
220        Ok(())
221    }
222
223    /// Decode a raw row against the current result set's metadata.
224    fn decode_raw(&self, raw: &tds_protocol::token::RawRow) -> Result<Row> {
225        #[cfg(feature = "always-encrypted")]
226        if let Some(ref dec) = self.decryptor {
227            return crate::column_parser::convert_raw_row_decrypted(
228                raw,
229                &self.meta,
230                &self.columns,
231                dec,
232            );
233        }
234        crate::column_parser::convert_raw_row(raw, &self.meta, &self.columns)
235    }
236
237    /// Decode a null-bitmap-compressed row against the current metadata.
238    fn decode_nbc(&self, nbc: &tds_protocol::token::NbcRow) -> Result<Row> {
239        #[cfg(feature = "always-encrypted")]
240        if let Some(ref dec) = self.decryptor {
241            return crate::column_parser::convert_nbc_row_decrypted(
242                nbc,
243                &self.meta,
244                &self.columns,
245                dec,
246            );
247        }
248        crate::column_parser::convert_nbc_row(nbc, &self.meta, &self.columns)
249    }
250}