Skip to main content

mssql_client/
stream.rs

1//! Query result sets with lazy per-row decoding.
2//!
3//! The full server response is buffered in memory first; rows are then
4//! *decoded* lazily as callers pull them. This is not incremental
5//! network streaming — see the next section for what that means for memory.
6//!
7//! ## Buffered vs True Streaming
8//!
9//! The underlying TDS response is reassembled into a single [`bytes::Bytes`]
10//! payload by [`mssql-codec`](mssql_codec). That payload is handed to the
11//! token parser which walks it once and enqueues each row's raw byte slice
12//! (a cheap, refcounted slice into the original [`bytes::Bytes`] per ADR-004)
13//! into the stream. Individual [`Row`]s are then decoded lazily when callers
14//! pull them — either via the [`Stream`]/[`Iterator`] impls or
15//! [`QueryStream::collect_all`].
16//!
17//! This "lazy decode" pattern keeps peak memory at roughly the size of the
18//! raw payload instead of payload + fully-typed `Vec<Row>`. Users who iterate
19//! and drop each [`Row`] see memory proportional to a single row at a time
20//! plus the shared raw payload. Users who `collect_all()` pay for the full
21//! `Vec<Row>` just like before.
22//!
23//! The same lazy-decode pattern applies to [`MultiResultStream`],
24//! [`ResultSet`], and [`ProcedureResult::result_sets`]: raw row bytes are
25//! stashed during response read and each [`Row`] is decoded when the caller
26//! pulls it. Because decoding can fail per row, [`ResultSet::next_row`]
27//! returns `Option<Result<Row, Error>>` rather than `Option<Row>` — callers
28//! observe decode errors at iteration time instead of at
29//! `call_procedure().await?` / `query_multiple().await?`.
30//!
31//! For truly large result sets that should not be buffered at all, use
32//! [`Client::query_stream`](crate::Client::query_stream), which reads packets
33//! from the network on demand (peak memory ~one row), or page with OFFSET/FETCH.
34
35use std::collections::VecDeque;
36use std::pin::Pin;
37use std::sync::Arc;
38use std::task::{Context, Poll};
39
40use futures_core::Stream;
41use tds_protocol::token::{ColMetaData, NbcRow, RawRow};
42
43use crate::error::Error;
44use crate::row::{Column, Row};
45
46/// A row that may be already decoded or still held as raw TDS bytes.
47///
48/// The lazy-parse query path enqueues raw rows (cheap `Bytes` slices into
49/// the original response payload) and decodes them on demand. The eager
50/// path used by tests and [`MultiResultStream::into_query_streams`] wraps
51/// already-decoded rows.
52#[derive(Debug, Clone)]
53pub(crate) enum PendingRow {
54    /// Already-decoded row (eager path — tests + `MultiResultStream` compat).
55    Parsed(Row),
56    /// Raw TDS row bytes, to be decoded on pull.
57    Raw(RawRow),
58    /// Null-bitmap-compressed row bytes, to be decoded on pull.
59    Nbc(NbcRow),
60}
61
62/// A result set from a query, yielding rows one at a time.
63///
64/// The complete server response is already buffered in memory by the time
65/// this is returned; each [`Row`] is *decoded* lazily as it is pulled, not
66/// fetched incrementally from the network. Peak memory is therefore roughly
67/// the size of the raw response payload regardless of how you iterate. For
68/// genuinely large result sets, use
69/// [`Client::query_stream`](crate::Client::query_stream) (incremental, peak
70/// memory ~one row) or page with `OFFSET`/`FETCH` in SQL rather than relying on
71/// this type to bound memory.
72///
73/// # Example
74///
75/// ```rust,no_run
76/// # use mssql_client::Row;
77/// # fn process_row(_: &Row) {}
78/// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
79/// let stream = client.query("SELECT * FROM large_table", &[]).await?;
80/// for row in stream {
81///     let row = row?;
82///     process_row(&row);
83/// }
84/// # Ok(())
85/// # }
86/// ```
87#[must_use = "streams must be consumed; dropping a stream discards remaining rows"]
88pub struct QueryStream<'a> {
89    /// Column metadata for the result set.
90    row_meta: Arc<crate::row::ColMetaData>,
91    /// Buffered rows (typed or raw) from the response.
92    rows: VecDeque<PendingRow>,
93    /// Protocol metadata needed to decode raw rows. `None` if every pending
94    /// row is already [`PendingRow::Parsed`].
95    meta: Option<ColMetaData>,
96    /// Pre-resolved column decryptor for Always Encrypted result sets.
97    ///
98    /// Wrapped in `Arc` so lazy result-set containers (`ResultSet`) can share
99    /// this state without duplicating the derived-key material.
100    #[cfg(feature = "always-encrypted")]
101    decryptor: Option<std::sync::Arc<crate::column_decryptor::ColumnDecryptor>>,
102    /// Whether the stream has completed.
103    finished: bool,
104    /// Lifetime tied to the connection.
105    _marker: std::marker::PhantomData<&'a ()>,
106}
107
108impl QueryStream<'_> {
109    /// Create a new query stream from already-decoded rows.
110    ///
111    /// This is the eager constructor used by unit tests. Production query
112    /// paths use [`QueryStream::from_raw`] to defer row decoding.
113    /// [`MultiResultStream::into_query_streams`] uses
114    /// [`ResultSet::into_query_stream`] to preserve pending-row state.
115    #[cfg(test)]
116    pub(crate) fn new(columns: Vec<Column>, rows: Vec<Row>) -> Self {
117        Self {
118            row_meta: Arc::new(crate::row::ColMetaData::new(columns)),
119            rows: rows.into_iter().map(PendingRow::Parsed).collect(),
120            meta: None,
121            #[cfg(feature = "always-encrypted")]
122            decryptor: None,
123            finished: false,
124            _marker: std::marker::PhantomData,
125        }
126    }
127
128    /// Create a query stream from raw row bytes and protocol metadata.
129    ///
130    /// Rows are decoded on demand as the stream is pulled. The `meta` must
131    /// describe every row in `pending`. If decryption is configured,
132    /// `decryptor` must cover the same column set.
133    pub(crate) fn from_raw(
134        columns: Vec<Column>,
135        pending: Vec<PendingRow>,
136        meta: ColMetaData,
137        #[cfg(feature = "always-encrypted")] decryptor: Option<
138            std::sync::Arc<crate::column_decryptor::ColumnDecryptor>,
139        >,
140    ) -> Self {
141        Self {
142            row_meta: Arc::new(crate::row::ColMetaData::new(columns)),
143            rows: pending.into(),
144            meta: Some(meta),
145            #[cfg(feature = "always-encrypted")]
146            decryptor,
147            finished: false,
148            _marker: std::marker::PhantomData,
149        }
150    }
151
152    /// Create an empty query stream (no results).
153    #[allow(dead_code)]
154    pub(crate) fn empty() -> Self {
155        Self {
156            row_meta: Arc::new(crate::row::ColMetaData::new(Vec::new())),
157            rows: VecDeque::new(),
158            meta: None,
159            #[cfg(feature = "always-encrypted")]
160            decryptor: None,
161            finished: true,
162            _marker: std::marker::PhantomData,
163        }
164    }
165
166    /// Get the column metadata for this result set.
167    #[must_use]
168    pub fn columns(&self) -> &[Column] {
169        &self.row_meta.columns
170    }
171
172    /// Check if the stream has finished.
173    #[must_use]
174    pub fn is_finished(&self) -> bool {
175        self.finished
176    }
177
178    /// Get the number of rows remaining in the buffer.
179    #[must_use]
180    pub fn rows_remaining(&self) -> usize {
181        self.rows.len()
182    }
183
184    /// Collect all remaining rows into a vector.
185    ///
186    /// This consumes the stream and loads all rows into memory. Each row is
187    /// decoded lazily here, so large raw payloads are freed as rows are
188    /// produced rather than held alongside the typed `Vec<Row>` throughout
189    /// the caller's query call.
190    ///
191    /// For very large result sets, consider iterating with the stream
192    /// instead.
193    pub async fn collect_all(mut self) -> Result<Vec<Row>, Error> {
194        let mut out = Vec::with_capacity(self.rows.len());
195        while let Some(pending) = self.rows.pop_front() {
196            out.push(self.decode(pending)?);
197        }
198        self.finished = true;
199        Ok(out)
200    }
201
202    /// Try to get the next row synchronously (without async).
203    ///
204    /// Returns `None` when no more rows are available or the next pending
205    /// row fails to decode. Use [`Iterator::next`] instead if you need to
206    /// observe decode errors.
207    pub fn try_next(&mut self) -> Option<Row> {
208        self.next().and_then(|r| r.ok())
209    }
210
211    /// Decode a pending row into a typed [`Row`].
212    fn decode(&self, pending: PendingRow) -> Result<Row, Error> {
213        match pending {
214            PendingRow::Parsed(row) => Ok(row),
215            PendingRow::Raw(raw) => {
216                let meta = self
217                    .meta
218                    .as_ref()
219                    .ok_or_else(|| Error::Protocol("row metadata missing for raw row".into()))?;
220                #[cfg(feature = "always-encrypted")]
221                if let Some(ref dec) = self.decryptor {
222                    return crate::column_parser::convert_raw_row_decrypted(
223                        &raw,
224                        meta,
225                        &self.row_meta,
226                        dec,
227                    );
228                }
229                crate::column_parser::convert_raw_row(&raw, meta, &self.row_meta)
230            }
231            PendingRow::Nbc(nbc) => {
232                let meta = self
233                    .meta
234                    .as_ref()
235                    .ok_or_else(|| Error::Protocol("row metadata missing for NBC row".into()))?;
236                #[cfg(feature = "always-encrypted")]
237                if let Some(ref dec) = self.decryptor {
238                    return crate::column_parser::convert_nbc_row_decrypted(
239                        &nbc,
240                        meta,
241                        &self.row_meta,
242                        dec,
243                    );
244                }
245                crate::column_parser::convert_nbc_row(&nbc, meta, &self.row_meta)
246            }
247        }
248    }
249}
250
251impl Stream for QueryStream<'_> {
252    type Item = Result<Row, Error>;
253
254    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
255        let this = self.get_mut();
256
257        if this.finished {
258            return Poll::Ready(None);
259        }
260
261        match this.rows.pop_front() {
262            Some(pending) => Poll::Ready(Some(this.decode(pending))),
263            None => {
264                this.finished = true;
265                Poll::Ready(None)
266            }
267        }
268    }
269}
270
271impl ExactSizeIterator for QueryStream<'_> {}
272
273impl Iterator for QueryStream<'_> {
274    type Item = Result<Row, Error>;
275
276    fn next(&mut self) -> Option<Self::Item> {
277        if self.finished {
278            return None;
279        }
280
281        match self.rows.pop_front() {
282            Some(pending) => Some(self.decode(pending)),
283            None => {
284                self.finished = true;
285                None
286            }
287        }
288    }
289
290    fn size_hint(&self) -> (usize, Option<usize>) {
291        let remaining = self.rows.len();
292        (remaining, Some(remaining))
293    }
294}
295
296/// Result of a non-query execution.
297///
298/// Contains the number of affected rows and any output parameters.
299#[derive(Debug, Clone)]
300#[non_exhaustive]
301#[must_use]
302pub struct ExecuteResult {
303    /// Number of rows affected by the statement.
304    pub rows_affected: u64,
305    /// Output parameters from stored procedures.
306    pub output_params: Vec<OutputParam>,
307}
308
309/// An output parameter from a stored procedure call.
310#[derive(Debug, Clone)]
311#[non_exhaustive]
312pub struct OutputParam {
313    /// Parameter name.
314    pub name: String,
315    /// Parameter value.
316    pub value: mssql_types::SqlValue,
317}
318
319impl ExecuteResult {
320    /// Create a new execute result.
321    pub fn new(rows_affected: u64) -> Self {
322        Self {
323            rows_affected,
324            output_params: Vec::new(),
325        }
326    }
327
328    /// Create a result with output parameters.
329    pub fn with_outputs(rows_affected: u64, output_params: Vec<OutputParam>) -> Self {
330        Self {
331            rows_affected,
332            output_params,
333        }
334    }
335
336    /// Get an output parameter by name.
337    #[must_use]
338    pub fn get_output(&self, name: &str) -> Option<&OutputParam> {
339        self.output_params
340            .iter()
341            .find(|p| p.name.eq_ignore_ascii_case(name))
342    }
343}
344
345/// Result of a stored procedure execution.
346///
347/// Contains the return value, affected row count, output parameters,
348/// and any result sets produced by the procedure.
349///
350/// # Example
351///
352/// ```rust,no_run
353/// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
354/// let result = client.call_procedure("dbo.GetUser", &[&1i32]).await?;
355///
356/// // Check the return value (RETURN statement in the proc)
357/// assert_eq!(result.return_value, 0);
358///
359/// // Process result sets
360/// for mut rs in result.result_sets {
361///     while let Some(row) = rs.next_row() {
362///         let row = row?;
363///         println!("{:?}", row);
364///     }
365/// }
366/// # Ok(())
367/// # }
368/// ```
369#[derive(Debug, Clone)]
370#[non_exhaustive]
371#[must_use]
372pub struct ProcedureResult {
373    /// Return value from the stored procedure's RETURN statement.
374    ///
375    /// Defaults to 0 if the procedure does not explicitly return a value,
376    /// which matches SQL Server's default behavior.
377    pub return_value: i32,
378    /// Total number of rows affected by statements within the procedure.
379    pub rows_affected: u64,
380    /// Output parameters returned by the procedure.
381    pub output_params: Vec<OutputParam>,
382    /// Result sets produced by SELECT statements within the procedure.
383    pub result_sets: Vec<ResultSet>,
384}
385
386impl ProcedureResult {
387    /// Create a new empty procedure result.
388    pub(crate) fn new() -> Self {
389        Self {
390            return_value: 0,
391            rows_affected: 0,
392            output_params: Vec::new(),
393            result_sets: Vec::new(),
394        }
395    }
396
397    /// Get the return value from the stored procedure.
398    ///
399    /// This is the value from the procedure's `RETURN` statement.
400    /// Defaults to 0 if not explicitly set by the procedure.
401    #[must_use]
402    pub fn get_return_value(&self) -> i32 {
403        self.return_value
404    }
405
406    /// Get an output parameter by name (case-insensitive).
407    ///
408    /// Strips the `@` prefix from both the search name and stored names
409    /// before comparing, so `get_output("result")` and `get_output("@result")`
410    /// are equivalent.
411    ///
412    /// # Example
413    ///
414    /// ```rust,no_run
415    /// # use mssql_client::SqlValue;
416    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
417    /// let result = client.procedure("dbo.CalculateSum")?
418    ///     .input("@a", &10i32)
419    ///     .input("@b", &20i32)
420    ///     .output_int("@result")
421    ///     .execute().await?;
422    ///
423    /// let output = result.get_output("@result").expect("output param exists");
424    /// assert_eq!(output.value, SqlValue::Int(30));
425    /// # Ok(())
426    /// # }
427    /// ```
428    #[must_use]
429    pub fn get_output(&self, name: &str) -> Option<&OutputParam> {
430        let search = name.strip_prefix('@').unwrap_or(name);
431        self.output_params.iter().find(|p| {
432            let stored = p.name.strip_prefix('@').unwrap_or(&p.name);
433            stored.eq_ignore_ascii_case(search)
434        })
435    }
436
437    /// Get the first result set, if any.
438    ///
439    /// Convenience method for procedures that return a single result set.
440    #[must_use]
441    pub fn first_result_set(&self) -> Option<&ResultSet> {
442        self.result_sets.first()
443    }
444
445    /// Check if the procedure produced any result sets.
446    #[must_use]
447    pub fn has_result_sets(&self) -> bool {
448        !self.result_sets.is_empty()
449    }
450}
451
452/// A single result set within a multi-result batch.
453///
454/// Rows are stored as `PendingRow` values that may be either already-decoded
455/// [`Row`]s (eager path, used by tests and direct construction) or raw TDS
456/// bytes (lazy path, used by [`crate::Client::call_procedure`] and
457/// [`crate::Client::query_multiple`]). Decoding happens on pull, so per-row
458/// decode errors surface through [`ResultSet::next_row`] and
459/// [`ResultSet::collect_all`].
460#[derive(Debug, Clone)]
461#[must_use]
462pub struct ResultSet {
463    /// Column metadata for this result set.
464    row_meta: Arc<crate::row::ColMetaData>,
465    /// Pending rows — either pre-parsed or raw TDS bytes awaiting decode.
466    pending_rows: VecDeque<PendingRow>,
467    /// Protocol metadata required to decode raw rows. `None` when every
468    /// pending row is already [`PendingRow::Parsed`] (eager path).
469    meta: Option<ColMetaData>,
470    /// Pre-resolved column decryptor for Always Encrypted result sets.
471    ///
472    /// Wrapped in `Arc` so cloning a [`ResultSet`] stays cheap (clones share
473    /// the underlying decryptor state instead of duplicating derived keys).
474    #[cfg(feature = "always-encrypted")]
475    decryptor: Option<std::sync::Arc<crate::column_decryptor::ColumnDecryptor>>,
476}
477
478impl ResultSet {
479    /// Create a new result set from already-decoded rows.
480    ///
481    /// This is the eager constructor used by tests and callers that already
482    /// hold typed [`Row`]s. Production query paths use `ResultSet::from_raw`
483    /// (private) to defer row decoding.
484    pub fn new(columns: Vec<Column>, rows: Vec<Row>) -> Self {
485        Self {
486            row_meta: Arc::new(crate::row::ColMetaData::new(columns)),
487            pending_rows: rows.into_iter().map(PendingRow::Parsed).collect(),
488            meta: None,
489            #[cfg(feature = "always-encrypted")]
490            decryptor: None,
491        }
492    }
493
494    /// Create a result set from raw row bytes and protocol metadata.
495    ///
496    /// Rows are decoded on demand as the caller pulls them via
497    /// [`ResultSet::next_row`] or [`ResultSet::collect_all`]. The `meta` must
498    /// describe every row in `pending`. If decryption is configured,
499    /// `decryptor` must cover the same column set.
500    pub(crate) fn from_raw(
501        columns: Vec<Column>,
502        pending: Vec<PendingRow>,
503        meta: ColMetaData,
504        #[cfg(feature = "always-encrypted")] decryptor: Option<
505            std::sync::Arc<crate::column_decryptor::ColumnDecryptor>,
506        >,
507    ) -> Self {
508        Self {
509            row_meta: Arc::new(crate::row::ColMetaData::new(columns)),
510            pending_rows: pending.into(),
511            meta: Some(meta),
512            #[cfg(feature = "always-encrypted")]
513            decryptor,
514        }
515    }
516
517    /// Get the column metadata.
518    #[must_use]
519    pub fn columns(&self) -> &[Column] {
520        &self.row_meta.columns
521    }
522
523    /// Get the number of rows remaining.
524    #[must_use]
525    pub fn rows_remaining(&self) -> usize {
526        self.pending_rows.len()
527    }
528
529    /// Get the next row from this result set.
530    ///
531    /// Returns `None` when no more rows remain, or `Some(Err(_))` when the
532    /// next pending row fails to decode. The stream is not short-circuited
533    /// on decode error — the caller may continue to pull subsequent rows.
534    pub fn next_row(&mut self) -> Option<Result<Row, Error>> {
535        self.pending_rows.pop_front().map(|p| self.decode(p))
536    }
537
538    /// Check if this result set is empty.
539    #[must_use]
540    pub fn is_empty(&self) -> bool {
541        self.pending_rows.is_empty()
542    }
543
544    /// Collect all remaining rows into a vector.
545    ///
546    /// Stops at the first decode error and returns it.
547    pub fn collect_all(&mut self) -> Result<Vec<Row>, Error> {
548        let mut out = Vec::with_capacity(self.pending_rows.len());
549        while let Some(pending) = self.pending_rows.pop_front() {
550            out.push(self.decode(pending)?);
551        }
552        Ok(out)
553    }
554
555    /// Decode a pending row into a typed [`Row`].
556    fn decode(&self, pending: PendingRow) -> Result<Row, Error> {
557        match pending {
558            PendingRow::Parsed(row) => Ok(row),
559            PendingRow::Raw(raw) => {
560                let meta = self
561                    .meta
562                    .as_ref()
563                    .ok_or_else(|| Error::Protocol("row metadata missing for raw row".into()))?;
564                #[cfg(feature = "always-encrypted")]
565                if let Some(ref dec) = self.decryptor {
566                    return crate::column_parser::convert_raw_row_decrypted(
567                        &raw,
568                        meta,
569                        &self.row_meta,
570                        dec,
571                    );
572                }
573                crate::column_parser::convert_raw_row(&raw, meta, &self.row_meta)
574            }
575            PendingRow::Nbc(nbc) => {
576                let meta = self
577                    .meta
578                    .as_ref()
579                    .ok_or_else(|| Error::Protocol("row metadata missing for NBC row".into()))?;
580                #[cfg(feature = "always-encrypted")]
581                if let Some(ref dec) = self.decryptor {
582                    return crate::column_parser::convert_nbc_row_decrypted(
583                        &nbc,
584                        meta,
585                        &self.row_meta,
586                        dec,
587                    );
588                }
589                crate::column_parser::convert_nbc_row(&nbc, meta, &self.row_meta)
590            }
591        }
592    }
593
594    /// Consume this result set and produce a [`QueryStream`] that carries the
595    /// same pending rows and decode state.
596    ///
597    /// Used by [`MultiResultStream::into_query_streams`] — avoids eagerly
598    /// materializing rows when the caller wants stream-level ergonomics.
599    fn into_query_stream<'a>(self) -> QueryStream<'a> {
600        QueryStream {
601            row_meta: self.row_meta,
602            rows: self.pending_rows,
603            meta: self.meta,
604            #[cfg(feature = "always-encrypted")]
605            decryptor: self.decryptor,
606            finished: false,
607            _marker: std::marker::PhantomData,
608        }
609    }
610}
611
612/// Multiple result sets from a batch or stored procedure.
613///
614/// Some queries return multiple result sets (e.g., stored procedures
615/// with multiple SELECT statements, or batches with multiple queries).
616///
617/// # Example
618///
619/// ```rust,no_run
620/// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
621/// // Execute a batch with multiple SELECT statements
622/// let mut results = client.query_multiple("SELECT 1 AS a; SELECT 2 AS b, 3 AS c;", &[]).await?;
623///
624/// // Process first result set
625/// while let Some(row) = results.next_row().await? {
626///     println!("Result 1: {:?}", row);
627/// }
628///
629/// // Move to second result set
630/// if results.next_result().await? {
631///     while let Some(row) = results.next_row().await? {
632///         println!("Result 2: {:?}", row);
633///     }
634/// }
635/// # Ok(())
636/// # }
637/// ```
638#[must_use = "streams must be consumed; dropping a stream discards remaining results"]
639pub struct MultiResultStream<'a> {
640    /// All result sets from the batch.
641    result_sets: Vec<ResultSet>,
642    /// Current result set index (0-based).
643    current_result: usize,
644    /// Lifetime tied to the connection.
645    _marker: std::marker::PhantomData<&'a ()>,
646}
647
648impl<'a> MultiResultStream<'a> {
649    /// Create a new multi-result stream from parsed result sets.
650    pub(crate) fn new(result_sets: Vec<ResultSet>) -> Self {
651        Self {
652            result_sets,
653            current_result: 0,
654            _marker: std::marker::PhantomData,
655        }
656    }
657
658    /// Create an empty multi-result stream.
659    #[allow(dead_code)]
660    pub(crate) fn empty() -> Self {
661        Self {
662            result_sets: Vec::new(),
663            current_result: 0,
664            _marker: std::marker::PhantomData,
665        }
666    }
667
668    /// Get the current result set index (0-based).
669    #[must_use]
670    pub fn current_result_index(&self) -> usize {
671        self.current_result
672    }
673
674    /// Get the total number of result sets.
675    #[must_use]
676    pub fn result_count(&self) -> usize {
677        self.result_sets.len()
678    }
679
680    /// Check if there are more result sets after the current one.
681    #[must_use]
682    pub fn has_more_results(&self) -> bool {
683        self.current_result + 1 < self.result_sets.len()
684    }
685
686    /// Get the column metadata for the current result set.
687    ///
688    /// Returns `None` if there are no result sets or we've moved past all of them.
689    #[must_use]
690    pub fn columns(&self) -> Option<&[Column]> {
691        self.result_sets
692            .get(self.current_result)
693            .map(|rs| rs.columns())
694    }
695
696    /// Move to the next result set.
697    ///
698    /// Returns `true` if there is another result set, `false` if no more.
699    pub async fn next_result(&mut self) -> Result<bool, Error> {
700        if self.current_result + 1 < self.result_sets.len() {
701            self.current_result += 1;
702            Ok(true)
703        } else {
704            Ok(false)
705        }
706    }
707
708    /// Get the next row from the current result set.
709    ///
710    /// Returns `None` when no more rows in the current result set.
711    /// Call `next_result()` to move to the next result set.
712    ///
713    /// Per-row decode errors (from lazy row decoding) surface here as
714    /// `Err(_)`. Pre-2.9 this reader decoded rows eagerly and decode errors
715    /// surfaced at `query_multiple().await?` instead.
716    pub async fn next_row(&mut self) -> Result<Option<Row>, Error> {
717        if let Some(result_set) = self.result_sets.get_mut(self.current_result) {
718            result_set.next_row().transpose()
719        } else {
720            Ok(None)
721        }
722    }
723
724    /// Get a mutable reference to the current result set.
725    #[must_use]
726    pub fn current_result_set(&mut self) -> Option<&mut ResultSet> {
727        self.result_sets.get_mut(self.current_result)
728    }
729
730    /// Collect all rows from the current result set.
731    ///
732    /// Returns `Ok(vec![])` if the current result index is out of range
733    /// (e.g., all result sets have been consumed). Propagates decode errors
734    /// from the underlying lazy row parser.
735    pub fn collect_current(&mut self) -> Result<Vec<Row>, Error> {
736        match self.result_sets.get_mut(self.current_result) {
737            Some(rs) => rs.collect_all(),
738            None => Ok(Vec::new()),
739        }
740    }
741
742    /// Consume the stream and return all result sets as `QueryStream`s.
743    pub fn into_query_streams(self) -> Vec<QueryStream<'a>> {
744        self.result_sets
745            .into_iter()
746            .map(ResultSet::into_query_stream)
747            .collect()
748    }
749}
750
751#[cfg(test)]
752#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
753mod tests {
754    use super::*;
755
756    #[test]
757    fn test_execute_result() {
758        let result = ExecuteResult::new(42);
759        assert_eq!(result.rows_affected, 42);
760        assert!(result.output_params.is_empty());
761    }
762
763    #[test]
764    fn test_procedure_result_defaults() {
765        let result = ProcedureResult::new();
766        assert_eq!(result.return_value, 0);
767        assert_eq!(result.rows_affected, 0);
768        assert!(result.output_params.is_empty());
769        assert!(result.result_sets.is_empty());
770        assert!(!result.has_result_sets());
771        assert!(result.first_result_set().is_none());
772    }
773
774    #[test]
775    fn test_procedure_result_get_output() {
776        let mut result = ProcedureResult::new();
777        result.output_params.push(OutputParam {
778            name: "@Total".to_string(),
779            value: mssql_types::SqlValue::Int(42),
780        });
781        result.output_params.push(OutputParam {
782            name: "@Message".to_string(),
783            value: mssql_types::SqlValue::String("ok".to_string()),
784        });
785
786        // Exact match (case-insensitive)
787        assert!(result.get_output("@Total").is_some());
788        assert!(result.get_output("@total").is_some());
789        assert!(result.get_output("@TOTAL").is_some());
790
791        // @ prefix stripping
792        assert!(result.get_output("Total").is_some());
793        assert!(result.get_output("total").is_some());
794
795        // Non-existent
796        assert!(result.get_output("@NotHere").is_none());
797        assert!(result.get_output("NotHere").is_none());
798    }
799
800    #[test]
801    fn test_procedure_result_with_result_sets() {
802        use mssql_types::SqlValue;
803
804        let columns = vec![Column {
805            name: "id".to_string(),
806            index: 0,
807            type_name: "INT".to_string(),
808            nullable: false,
809            max_length: Some(4),
810            precision: None,
811            scale: None,
812            collation: None,
813        }];
814        let rows = vec![Row::from_values(columns.clone(), vec![SqlValue::Int(1)])];
815        let rs = ResultSet::new(columns, rows);
816
817        let mut result = ProcedureResult::new();
818        result.result_sets.push(rs);
819        result.return_value = 7;
820        result.rows_affected = 5;
821
822        assert!(result.has_result_sets());
823        assert_eq!(result.get_return_value(), 7);
824        assert_eq!(result.first_result_set().unwrap().columns().len(), 1);
825    }
826
827    #[test]
828    fn test_execute_result_with_outputs() {
829        let outputs = vec![OutputParam {
830            name: "ReturnValue".to_string(),
831            value: mssql_types::SqlValue::Int(100),
832        }];
833
834        let result = ExecuteResult::with_outputs(10, outputs);
835        assert_eq!(result.rows_affected, 10);
836        assert!(result.get_output("ReturnValue").is_some());
837        assert!(result.get_output("returnvalue").is_some()); // case-insensitive
838        assert!(result.get_output("NotFound").is_none());
839    }
840
841    #[test]
842    fn test_query_stream_columns() {
843        let columns = vec![Column {
844            name: "id".to_string(),
845            index: 0,
846            type_name: "INT".to_string(),
847            nullable: false,
848            max_length: Some(4),
849            precision: Some(0),
850            scale: Some(0),
851            collation: None,
852        }];
853
854        let stream = QueryStream::new(columns, Vec::new());
855        assert_eq!(stream.columns().len(), 1);
856        assert_eq!(stream.columns()[0].name, "id");
857        assert!(!stream.is_finished());
858    }
859
860    #[test]
861    fn test_query_stream_with_rows() {
862        use mssql_types::SqlValue;
863
864        let columns = vec![
865            Column {
866                name: "id".to_string(),
867                index: 0,
868                type_name: "INT".to_string(),
869                nullable: false,
870                max_length: Some(4),
871                precision: None,
872                scale: None,
873                collation: None,
874            },
875            Column {
876                name: "name".to_string(),
877                index: 1,
878                type_name: "NVARCHAR".to_string(),
879                nullable: true,
880                max_length: Some(100),
881                precision: None,
882                scale: None,
883                collation: None,
884            },
885        ];
886
887        let rows = vec![
888            Row::from_values(
889                columns.clone(),
890                vec![SqlValue::Int(1), SqlValue::String("Alice".to_string())],
891            ),
892            Row::from_values(
893                columns.clone(),
894                vec![SqlValue::Int(2), SqlValue::String("Bob".to_string())],
895            ),
896        ];
897
898        let mut stream = QueryStream::new(columns, rows);
899        assert_eq!(stream.columns().len(), 2);
900        assert_eq!(stream.rows_remaining(), 2);
901        assert!(!stream.is_finished());
902
903        // First row
904        let row1 = stream.try_next().unwrap();
905        assert_eq!(row1.get::<i32>(0).unwrap(), 1);
906        assert_eq!(row1.get_by_name::<String>("name").unwrap(), "Alice");
907
908        // Second row
909        let row2 = stream.try_next().unwrap();
910        assert_eq!(row2.get::<i32>(0).unwrap(), 2);
911        assert_eq!(row2.get_by_name::<String>("name").unwrap(), "Bob");
912
913        // No more rows
914        assert!(stream.try_next().is_none());
915        assert!(stream.is_finished());
916    }
917
918    #[test]
919    fn test_query_stream_iterator() {
920        use mssql_types::SqlValue;
921
922        let columns = vec![Column {
923            name: "val".to_string(),
924            index: 0,
925            type_name: "INT".to_string(),
926            nullable: false,
927            max_length: None,
928            precision: None,
929            scale: None,
930            collation: None,
931        }];
932
933        let rows = vec![
934            Row::from_values(columns.clone(), vec![SqlValue::Int(10)]),
935            Row::from_values(columns.clone(), vec![SqlValue::Int(20)]),
936            Row::from_values(columns.clone(), vec![SqlValue::Int(30)]),
937        ];
938
939        let mut stream = QueryStream::new(columns, rows);
940
941        // Use iterator — unwrap each Result so test failures are visible
942        // (QueryStream's Iterator impl always yields Ok, but we should
943        // not silently swallow errors if that ever changes)
944        let values: Vec<i32> = stream
945            .by_ref()
946            .map(|r| r.unwrap().get::<i32>(0).unwrap())
947            .collect();
948
949        assert_eq!(values, vec![10, 20, 30]);
950        assert!(stream.is_finished());
951    }
952
953    #[test]
954    fn test_query_stream_empty() {
955        let stream = QueryStream::empty();
956        assert!(stream.columns().is_empty());
957        assert_eq!(stream.rows_remaining(), 0);
958        assert!(stream.is_finished());
959    }
960
961    /// Exercises the lazy-decode path: rows are stored as raw TDS bytes and
962    /// decoded only when the caller pulls them. Mirrors what
963    /// `read_query_response` now produces and pins the contract between
964    /// `PendingRow::Raw` and the per-row decode in `poll_next`/`next`.
965    #[test]
966    fn test_query_stream_lazy_raw_row_decoding() {
967        use bytes::Bytes;
968        use tds_protocol::token::{ColMetaData, ColumnData, RawRow, TypeInfo};
969        use tds_protocol::types::TypeId;
970
971        // Build raw row bytes for two columns: IntN(42) + IntN(NULL).
972        let mut data = Vec::new();
973        data.push(4); // IntN length prefix — 4 bytes
974        data.extend_from_slice(&42i32.to_le_bytes());
975        data.push(0); // IntN NULL (zero-length)
976
977        let meta = ColMetaData {
978            columns: vec![
979                ColumnData {
980                    name: "a".to_string(),
981                    type_id: TypeId::IntN,
982                    col_type: 0x26,
983                    flags: 0x00,
984                    user_type: 0,
985                    type_info: TypeInfo {
986                        max_length: Some(4),
987                        precision: None,
988                        scale: None,
989                        collation: None,
990                    },
991                    crypto_metadata: None,
992                },
993                ColumnData {
994                    name: "b".to_string(),
995                    type_id: TypeId::IntN,
996                    col_type: 0x26,
997                    flags: 0x01,
998                    user_type: 0,
999                    type_info: TypeInfo {
1000                        max_length: Some(4),
1001                        precision: None,
1002                        scale: None,
1003                        collation: None,
1004                    },
1005                    crypto_metadata: None,
1006                },
1007            ],
1008            cek_table: None,
1009        };
1010
1011        let columns = vec![
1012            Column {
1013                name: "a".to_string(),
1014                index: 0,
1015                type_name: "INT".to_string(),
1016                nullable: false,
1017                max_length: Some(4),
1018                precision: None,
1019                scale: None,
1020                collation: None,
1021            },
1022            Column {
1023                name: "b".to_string(),
1024                index: 1,
1025                type_name: "INT".to_string(),
1026                nullable: true,
1027                max_length: Some(4),
1028                precision: None,
1029                scale: None,
1030                collation: None,
1031            },
1032        ];
1033
1034        let pending = vec![PendingRow::Raw(RawRow {
1035            data: Bytes::from(data),
1036        })];
1037
1038        #[cfg(feature = "always-encrypted")]
1039        let mut stream = QueryStream::from_raw(columns, pending, meta, None);
1040        #[cfg(not(feature = "always-encrypted"))]
1041        let mut stream = QueryStream::from_raw(columns, pending, meta);
1042
1043        assert_eq!(stream.rows_remaining(), 1);
1044        let row = stream
1045            .next()
1046            .expect("one row pending")
1047            .expect("row decoded successfully");
1048        assert_eq!(row.get::<i32>(0).unwrap(), 42);
1049        assert!(row.is_null(1));
1050        assert!(stream.next().is_none());
1051        assert!(stream.is_finished());
1052    }
1053
1054    /// Decoder errors must surface per-row via `Stream`/`Iterator` without
1055    /// derailing the stream state. Truncated raw bytes trigger a decode
1056    /// error that the caller observes as `Some(Err(_))`.
1057    #[test]
1058    fn test_query_stream_lazy_decode_error_propagates() {
1059        use bytes::Bytes;
1060        use tds_protocol::token::{ColMetaData, ColumnData, RawRow, TypeInfo};
1061        use tds_protocol::types::TypeId;
1062
1063        // Declare an Int4 column but provide only 2 bytes — decode must fail.
1064        let data = vec![0x01u8, 0x02];
1065
1066        let meta = ColMetaData {
1067            columns: vec![ColumnData {
1068                name: "a".to_string(),
1069                type_id: TypeId::Int4,
1070                col_type: 0x38,
1071                flags: 0x00,
1072                user_type: 0,
1073                type_info: TypeInfo {
1074                    max_length: Some(4),
1075                    precision: None,
1076                    scale: None,
1077                    collation: None,
1078                },
1079                crypto_metadata: None,
1080            }],
1081            cek_table: None,
1082        };
1083
1084        let columns = vec![Column {
1085            name: "a".to_string(),
1086            index: 0,
1087            type_name: "INT".to_string(),
1088            nullable: false,
1089            max_length: Some(4),
1090            precision: None,
1091            scale: None,
1092            collation: None,
1093        }];
1094
1095        let pending = vec![PendingRow::Raw(RawRow {
1096            data: Bytes::from(data),
1097        })];
1098
1099        #[cfg(feature = "always-encrypted")]
1100        let mut stream = QueryStream::from_raw(columns, pending, meta, None);
1101        #[cfg(not(feature = "always-encrypted"))]
1102        let mut stream = QueryStream::from_raw(columns, pending, meta);
1103
1104        let item = stream.next().expect("pending row present");
1105        assert!(item.is_err(), "truncated bytes must surface a decode error");
1106        assert!(stream.next().is_none());
1107    }
1108
1109    /// Helper to build a single-column IntN metadata block for the lazy
1110    /// `ResultSet` / `MultiResultStream` tests below.
1111    #[cfg(test)]
1112    fn intn_meta_and_columns(
1113        col_name: &str,
1114        nullable: bool,
1115    ) -> (tds_protocol::token::ColMetaData, Vec<Column>) {
1116        use tds_protocol::token::{ColMetaData, ColumnData, TypeInfo};
1117        use tds_protocol::types::TypeId;
1118        (
1119            ColMetaData {
1120                columns: vec![ColumnData {
1121                    name: col_name.to_string(),
1122                    type_id: TypeId::IntN,
1123                    col_type: 0x26,
1124                    flags: if nullable { 0x01 } else { 0x00 },
1125                    user_type: 0,
1126                    type_info: TypeInfo {
1127                        max_length: Some(4),
1128                        precision: None,
1129                        scale: None,
1130                        collation: None,
1131                    },
1132                    crypto_metadata: None,
1133                }],
1134                cek_table: None,
1135            },
1136            vec![Column {
1137                name: col_name.to_string(),
1138                index: 0,
1139                type_name: "INT".to_string(),
1140                nullable,
1141                max_length: Some(4),
1142                precision: None,
1143                scale: None,
1144                collation: None,
1145            }],
1146        )
1147    }
1148
1149    /// Exercises the `ResultSet` lazy-decode path introduced in 2.9.
1150    /// Mirrors `test_query_stream_lazy_raw_row_decoding` but via the
1151    /// result-set API that `call_procedure` / `query_multiple` expose.
1152    #[test]
1153    fn test_result_set_lazy_raw_row_decoding() {
1154        use bytes::Bytes;
1155        use tds_protocol::token::RawRow;
1156
1157        let (meta, columns) = intn_meta_and_columns("a", false);
1158
1159        // Two rows: 7 and 11 encoded as IntN(4).
1160        let pending = vec![
1161            PendingRow::Raw(RawRow {
1162                data: {
1163                    let mut b = Vec::with_capacity(5);
1164                    b.push(4);
1165                    b.extend_from_slice(&7i32.to_le_bytes());
1166                    Bytes::from(b)
1167                },
1168            }),
1169            PendingRow::Raw(RawRow {
1170                data: {
1171                    let mut b = Vec::with_capacity(5);
1172                    b.push(4);
1173                    b.extend_from_slice(&11i32.to_le_bytes());
1174                    Bytes::from(b)
1175                },
1176            }),
1177        ];
1178
1179        #[cfg(feature = "always-encrypted")]
1180        let mut rs = ResultSet::from_raw(columns, pending, meta, None);
1181        #[cfg(not(feature = "always-encrypted"))]
1182        let mut rs = ResultSet::from_raw(columns, pending, meta);
1183
1184        assert_eq!(rs.rows_remaining(), 2);
1185        assert!(!rs.is_empty());
1186
1187        let row1 = rs.next_row().expect("row present").expect("decodes");
1188        assert_eq!(row1.get::<i32>(0).unwrap(), 7);
1189
1190        let row2 = rs.next_row().expect("row present").expect("decodes");
1191        assert_eq!(row2.get::<i32>(0).unwrap(), 11);
1192
1193        assert!(rs.next_row().is_none());
1194        assert!(rs.is_empty());
1195    }
1196
1197    /// Decoder errors in `ResultSet::next_row` must surface per-row without
1198    /// derailing further calls. Same contract as
1199    /// `test_query_stream_lazy_decode_error_propagates`.
1200    #[test]
1201    fn test_result_set_lazy_decode_error_propagates() {
1202        use bytes::Bytes;
1203        use tds_protocol::token::{ColMetaData, ColumnData, RawRow, TypeInfo};
1204        use tds_protocol::types::TypeId;
1205
1206        // Int4 (not IntN) with only 2 bytes → decode must fail.
1207        let meta = ColMetaData {
1208            columns: vec![ColumnData {
1209                name: "a".to_string(),
1210                type_id: TypeId::Int4,
1211                col_type: 0x38,
1212                flags: 0x00,
1213                user_type: 0,
1214                type_info: TypeInfo {
1215                    max_length: Some(4),
1216                    precision: None,
1217                    scale: None,
1218                    collation: None,
1219                },
1220                crypto_metadata: None,
1221            }],
1222            cek_table: None,
1223        };
1224        let columns = vec![Column {
1225            name: "a".to_string(),
1226            index: 0,
1227            type_name: "INT".to_string(),
1228            nullable: false,
1229            max_length: Some(4),
1230            precision: None,
1231            scale: None,
1232            collation: None,
1233        }];
1234
1235        let pending = vec![PendingRow::Raw(RawRow {
1236            data: Bytes::from(vec![0x01u8, 0x02]),
1237        })];
1238
1239        #[cfg(feature = "always-encrypted")]
1240        let mut rs = ResultSet::from_raw(columns, pending, meta, None);
1241        #[cfg(not(feature = "always-encrypted"))]
1242        let mut rs = ResultSet::from_raw(columns, pending, meta);
1243
1244        let first = rs.next_row().expect("pending row present");
1245        assert!(
1246            first.is_err(),
1247            "truncated bytes must surface a decode error"
1248        );
1249        assert!(rs.next_row().is_none());
1250    }
1251
1252    /// `collect_all` on a lazy `ResultSet` decodes every pending row and
1253    /// propagates the first decode error. Ensures 2.9's signature change is
1254    /// exercised end-to-end.
1255    #[test]
1256    fn test_result_set_lazy_collect_all_success_and_error() {
1257        use bytes::Bytes;
1258        use tds_protocol::token::RawRow;
1259
1260        // Success: two rows decode cleanly.
1261        let (meta_ok, cols_ok) = intn_meta_and_columns("a", false);
1262        let pending_ok = vec![
1263            PendingRow::Raw(RawRow {
1264                data: {
1265                    let mut b = Vec::with_capacity(5);
1266                    b.push(4);
1267                    b.extend_from_slice(&10i32.to_le_bytes());
1268                    Bytes::from(b)
1269                },
1270            }),
1271            PendingRow::Raw(RawRow {
1272                data: {
1273                    let mut b = Vec::with_capacity(5);
1274                    b.push(4);
1275                    b.extend_from_slice(&20i32.to_le_bytes());
1276                    Bytes::from(b)
1277                },
1278            }),
1279        ];
1280
1281        #[cfg(feature = "always-encrypted")]
1282        let mut rs_ok = ResultSet::from_raw(cols_ok, pending_ok, meta_ok, None);
1283        #[cfg(not(feature = "always-encrypted"))]
1284        let mut rs_ok = ResultSet::from_raw(cols_ok, pending_ok, meta_ok);
1285        let rows = rs_ok.collect_all().expect("all rows decode");
1286        assert_eq!(rows.len(), 2);
1287        assert_eq!(rows[0].get::<i32>(0).unwrap(), 10);
1288        assert_eq!(rows[1].get::<i32>(0).unwrap(), 20);
1289        assert!(rs_ok.is_empty());
1290
1291        // Error: a truncated row (declared Int4 with only 2 bytes) makes
1292        // collect_all fail. collect_all short-circuits on the first Err.
1293        use tds_protocol::token::{ColMetaData, ColumnData, TypeInfo};
1294        use tds_protocol::types::TypeId;
1295        let meta_err = ColMetaData {
1296            columns: vec![ColumnData {
1297                name: "a".to_string(),
1298                type_id: TypeId::Int4,
1299                col_type: 0x38,
1300                flags: 0x00,
1301                user_type: 0,
1302                type_info: TypeInfo {
1303                    max_length: Some(4),
1304                    precision: None,
1305                    scale: None,
1306                    collation: None,
1307                },
1308                crypto_metadata: None,
1309            }],
1310            cek_table: None,
1311        };
1312        let cols_err = vec![Column {
1313            name: "a".to_string(),
1314            index: 0,
1315            type_name: "INT".to_string(),
1316            nullable: false,
1317            max_length: Some(4),
1318            precision: None,
1319            scale: None,
1320            collation: None,
1321        }];
1322        let pending_err = vec![PendingRow::Raw(RawRow {
1323            data: Bytes::from(vec![0x01u8, 0x02]),
1324        })];
1325
1326        #[cfg(feature = "always-encrypted")]
1327        let mut rs_err = ResultSet::from_raw(cols_err, pending_err, meta_err, None);
1328        #[cfg(not(feature = "always-encrypted"))]
1329        let mut rs_err = ResultSet::from_raw(cols_err, pending_err, meta_err);
1330        let err = rs_err.collect_all();
1331        assert!(err.is_err(), "collect_all must propagate decode error");
1332    }
1333
1334    /// `MultiResultStream` end-to-end lazy-decode path: two lazy `ResultSet`s
1335    /// decoded on demand as the caller walks through via `next_row` /
1336    /// `next_result`. Pins the 2.9 refactor of `read_multi_result_response`.
1337    #[tokio::test]
1338    async fn test_multi_result_stream_lazy_decode_across_result_sets() {
1339        use bytes::Bytes;
1340        use tds_protocol::token::RawRow;
1341
1342        let (meta1, cols1) = intn_meta_and_columns("a", false);
1343        let pending1 = vec![PendingRow::Raw(RawRow {
1344            data: {
1345                let mut b = Vec::with_capacity(5);
1346                b.push(4);
1347                b.extend_from_slice(&101i32.to_le_bytes());
1348                Bytes::from(b)
1349            },
1350        })];
1351        #[cfg(feature = "always-encrypted")]
1352        let rs1 = ResultSet::from_raw(cols1, pending1, meta1, None);
1353        #[cfg(not(feature = "always-encrypted"))]
1354        let rs1 = ResultSet::from_raw(cols1, pending1, meta1);
1355
1356        let (meta2, cols2) = intn_meta_and_columns("b", false);
1357        let pending2 = vec![PendingRow::Raw(RawRow {
1358            data: {
1359                let mut b = Vec::with_capacity(5);
1360                b.push(4);
1361                b.extend_from_slice(&202i32.to_le_bytes());
1362                Bytes::from(b)
1363            },
1364        })];
1365        #[cfg(feature = "always-encrypted")]
1366        let rs2 = ResultSet::from_raw(cols2, pending2, meta2, None);
1367        #[cfg(not(feature = "always-encrypted"))]
1368        let rs2 = ResultSet::from_raw(cols2, pending2, meta2);
1369
1370        let mut stream = MultiResultStream::new(vec![rs1, rs2]);
1371        assert_eq!(stream.result_count(), 2);
1372        assert_eq!(stream.current_result_index(), 0);
1373
1374        let row = stream
1375            .next_row()
1376            .await
1377            .expect("first row success")
1378            .expect("row present");
1379        assert_eq!(row.get::<i32>(0).unwrap(), 101);
1380        assert!(stream.next_row().await.expect("no more rows").is_none());
1381
1382        assert!(stream.has_more_results());
1383        assert!(stream.next_result().await.expect("advance ok"));
1384        assert_eq!(stream.current_result_index(), 1);
1385
1386        let row = stream
1387            .next_row()
1388            .await
1389            .expect("second row success")
1390            .expect("row present");
1391        assert_eq!(row.get::<i32>(0).unwrap(), 202);
1392    }
1393}