Skip to main content

mssql_client/
row_stream.rs

1//! True incremental row streaming: rows pulled from the socket on demand.
2//!
3//! Unlike [`QueryStream`](crate::QueryStream) — which buffers the entire
4//! response in memory and then decodes rows lazily — [`RowStream`] holds the
5//! connection and reads TDS packets only as rows are pulled. Peak memory is
6//! roughly one packet plus one partial row, independent of result-set size, so
7//! a multi-million-row `SELECT` does not have to fit in client memory.
8//!
9//! Returned by [`Client::query_stream`](crate::Client::query_stream). The
10//! stream borrows the client mutably for its lifetime, so no other operation
11//! can run on the connection until the stream is consumed or dropped — natural
12//! backpressure, and the type system enforces the exclusivity.
13//!
14//! ```no_run
15//! # use mssql_client::{Client, Ready, Row};
16//! # async fn ex(client: &mut Client<Ready>) -> Result<(), mssql_client::Error> {
17//! let mut stream = client.query_stream("SELECT id, name FROM big_table", &[]).await?;
18//! while let Some(row) = stream.try_next().await? {
19//!     let id: i32 = row.get_by_name("id")?;
20//!     let _ = id;
21//! }
22//! # Ok(())
23//! # }
24//! ```
25
26use std::sync::Arc;
27
28use tds_protocol::token::{ColMetaData, Token};
29
30use crate::Client;
31use crate::error::{Error, Result};
32use crate::row::{Column, Row};
33use crate::row_source::{Pull, RowSource};
34use crate::state::{ConnectionState, Ready};
35
36/// An incrementally streamed result set: rows are read from the network as they
37/// are pulled, not buffered up front.
38///
39/// See the [module docs](self) for how this differs from
40/// [`QueryStream`](crate::QueryStream). Obtain one from
41/// [`Client::query_stream`](crate::Client::query_stream).
42///
43/// Pulling is async (each row may require reading another packet), so this type
44/// is consumed via [`try_next`](Self::try_next) / [`collect_all`](Self::collect_all)
45/// rather than the synchronous [`Iterator`] that `QueryStream` offers.
46#[must_use = "streams must be consumed; dropping a stream discards remaining rows"]
47pub struct RowStream<'a, S: ConnectionState = Ready> {
48    /// The client whose connection supplies packets. Borrowed for the stream's
49    /// lifetime so no other request can run concurrently. Generic over the
50    /// connection state so the same stream serves both `Ready` and
51    /// `InTransaction` clients.
52    client: &'a mut Client<S>,
53    /// The incremental token decoder over the rolling packet buffer.
54    source: RowSource,
55    /// Columns for the current result set (rebuilt on each ColMetaData).
56    row_meta: Arc<crate::row::ColMetaData>,
57    /// Protocol metadata for decoding raw rows of the current result set.
58    meta: ColMetaData,
59    /// Pre-resolved column decryptor for the current Always Encrypted result set.
60    #[cfg(feature = "always-encrypted")]
61    decryptor: Option<std::sync::Arc<crate::column_decryptor::ColumnDecryptor>>,
62    /// Whether the stream has reached the end of the response.
63    finished: bool,
64}
65
66impl<'a, S: ConnectionState> RowStream<'a, S> {
67    /// Construct a stream positioned just after the first ColMetaData, ready to
68    /// yield the result set's rows. Called by `Client::query_stream`.
69    pub(crate) fn new(
70        client: &'a mut Client<S>,
71        source: RowSource,
72        columns: Vec<Column>,
73        meta: ColMetaData,
74        #[cfg(feature = "always-encrypted")] decryptor: Option<
75            std::sync::Arc<crate::column_decryptor::ColumnDecryptor>,
76        >,
77    ) -> Self {
78        Self {
79            client,
80            source,
81            row_meta: Arc::new(crate::row::ColMetaData::new(columns)),
82            meta,
83            #[cfg(feature = "always-encrypted")]
84            decryptor,
85            finished: false,
86        }
87    }
88
89    /// Construct an already-finished stream (the query produced no result set,
90    /// e.g. an `INSERT`). The caller has already cleared the in-flight flag.
91    pub(crate) fn empty(client: &'a mut Client<S>) -> Self {
92        Self {
93            client,
94            source: RowSource::new(false),
95            row_meta: Arc::new(crate::row::ColMetaData::new(Vec::new())),
96            meta: ColMetaData::default(),
97            #[cfg(feature = "always-encrypted")]
98            decryptor: None,
99            finished: true,
100        }
101    }
102
103    /// The columns of the current result set.
104    #[must_use]
105    pub fn columns(&self) -> &[Column] {
106        &self.row_meta.columns
107    }
108
109    /// Whether the stream has been fully consumed.
110    #[must_use]
111    pub fn is_finished(&self) -> bool {
112        self.finished
113    }
114
115    /// Pull the next row, reading more packets from the connection as needed.
116    ///
117    /// Returns `Ok(None)` once the response is fully drained — at which point
118    /// the connection is clean for the next request. A server error token in
119    /// the stream is surfaced here as [`Error::Server`].
120    pub async fn try_next(&mut self) -> Result<Option<Row>> {
121        if self.finished {
122            return Ok(None);
123        }
124
125        loop {
126            match self.source.pull()? {
127                Pull::Token(Token::Row(raw)) => return Ok(Some(self.decode_raw(&raw)?)),
128                Pull::Token(Token::NbcRow(nbc)) => return Ok(Some(self.decode_nbc(&nbc)?)),
129                Pull::Token(Token::ColMetaData(meta)) => {
130                    // A new result set within the same response (multi-statement
131                    // batch). Stream its rows flatly, continuing from here.
132                    self.switch_result_set(meta).await?;
133                }
134                Pull::Token(Token::Error(err)) => {
135                    self.finish();
136                    return Err(crate::client::response::server_token_to_error(&err));
137                }
138                Pull::Token(Token::Done(done)) => {
139                    if done.status.error {
140                        self.finish();
141                        return Err(Error::Query(
142                            "query failed (server set error flag in DONE token)".to_string(),
143                        ));
144                    }
145                    // Otherwise keep going: rows of another result set, or the
146                    // final DONE followed by Pull::End, may still come.
147                }
148                Pull::Token(Token::EnvChange(env)) => {
149                    // Keep the transaction descriptor in sync with raw
150                    // BEGIN/COMMIT/ROLLBACK seen mid-stream, as the buffered
151                    // readers do.
152                    self.client.apply_transaction_env_change(&env);
153                }
154                Pull::Token(_) => {
155                    // Info / Order / DoneProc / DoneInProc, etc.
156                    // Not row data; keep pulling.
157                }
158                Pull::NeedMore => match self.client.read_response_packet().await? {
159                    Some((payload, is_eom)) => self.source.push_packet(payload, is_eom),
160                    None => {
161                        self.finish();
162                        return Err(Error::ConnectionClosed);
163                    }
164                },
165                Pull::End => {
166                    self.finish();
167                    return Ok(None);
168                }
169            }
170        }
171    }
172
173    /// Drain the remaining rows into a vector.
174    ///
175    /// For large result sets prefer [`try_next`](Self::try_next) — this loads
176    /// every remaining row into memory at once.
177    pub async fn collect_all(mut self) -> Result<Vec<Row>> {
178        let mut out = Vec::new();
179        while let Some(row) = self.try_next().await? {
180            out.push(row);
181        }
182        Ok(out)
183    }
184
185    /// Stop the stream early and leave the connection reusable.
186    ///
187    /// Sends an Attention to the server and drains to its acknowledgement so the
188    /// connection is clean for the next request — the correct way to abandon a
189    /// large result set you no longer need.
190    ///
191    /// Calling this is optional: simply **dropping** a partially-read stream is
192    /// safe but leaves the connection marked in-flight, so a pooled connection
193    /// is discarded on return and a directly reused client recovers it (with an
194    /// Attention/drain) on its next request. `cancel` avoids that discard and
195    /// reports any error from the cancellation.
196    pub async fn cancel(mut self) -> Result<()> {
197        if self.finished {
198            return Ok(());
199        }
200        self.finished = true;
201        self.client.cancel_in_flight_response().await
202    }
203
204    /// Mark the stream finished and the connection clean for the next request.
205    fn finish(&mut self) {
206        self.finished = true;
207        self.client.note_response_drained();
208    }
209
210    /// Adopt a new result set's metadata mid-stream (multi-statement batch).
211    async fn switch_result_set(&mut self, meta: ColMetaData) -> Result<()> {
212        self.row_meta = Arc::new(crate::row::ColMetaData::new(Client::<S>::build_columns(
213            &meta,
214        )));
215        #[cfg(feature = "always-encrypted")]
216        {
217            self.decryptor = self
218                .client
219                .resolve_decryptor(&meta)
220                .await?
221                .map(std::sync::Arc::new);
222        }
223        self.meta = meta;
224        Ok(())
225    }
226
227    /// Decode a raw row against the current result set's metadata.
228    fn decode_raw(&self, raw: &tds_protocol::token::RawRow) -> Result<Row> {
229        #[cfg(feature = "always-encrypted")]
230        if let Some(ref dec) = self.decryptor {
231            return crate::column_parser::convert_raw_row_decrypted(
232                raw,
233                &self.meta,
234                &self.row_meta,
235                dec,
236            );
237        }
238        crate::column_parser::convert_raw_row(raw, &self.meta, &self.row_meta)
239    }
240
241    /// Decode a null-bitmap-compressed row against the current metadata.
242    fn decode_nbc(&self, nbc: &tds_protocol::token::NbcRow) -> Result<Row> {
243        #[cfg(feature = "always-encrypted")]
244        if let Some(ref dec) = self.decryptor {
245            return crate::column_parser::convert_nbc_row_decrypted(
246                nbc,
247                &self.meta,
248                &self.row_meta,
249                dec,
250            );
251        }
252        crate::column_parser::convert_nbc_row(nbc, &self.meta, &self.row_meta)
253    }
254}