Skip to main content

mssql_client/
stream.rs

1//! Query result sets with lazy per-row decoding.
2//!
3//! The full server response is buffered in memory first; rows are then
4//! *decoded* lazily as callers pull them. This is not incremental
5//! network streaming — see the next section for what that means for memory.
6//!
7//! ## Buffered vs True Streaming
8//!
9//! The underlying TDS response is reassembled into a single [`bytes::Bytes`]
10//! payload by [`mssql-codec`](mssql_codec). That payload is handed to the
11//! token parser which walks it once and enqueues each row's raw byte slice
12//! (a cheap, refcounted slice into the original [`bytes::Bytes`] per ADR-004)
13//! into the stream. Individual [`Row`]s are then decoded lazily when callers
14//! pull them — either via the [`Stream`]/[`Iterator`] impls or
15//! [`QueryStream::collect_all`].
16//!
17//! This "lazy decode" pattern keeps peak memory at roughly the size of the
18//! raw payload instead of payload + fully-typed `Vec<Row>`. Users who iterate
19//! and drop each [`Row`] see memory proportional to a single row at a time
20//! plus the shared raw payload. Users who `collect_all()` pay for the full
21//! `Vec<Row>` just like before.
22//!
23//! The same lazy-decode pattern applies to [`MultiResultStream`],
24//! [`ResultSet`], and [`ProcedureResult::result_sets`]: raw row bytes are
25//! stashed during response read and each [`Row`] is decoded when the caller
26//! pulls it. Because decoding can fail per row, [`ResultSet::next_row`]
27//! returns `Option<Result<Row, Error>>` rather than `Option<Row>` — callers
28//! observe decode errors at iteration time instead of at
29//! `call_procedure().await?` / `query_multiple().await?`.
30//!
31//! For truly large result sets, consider using OFFSET/FETCH pagination.
32
33use std::collections::VecDeque;
34use std::pin::Pin;
35use std::task::{Context, Poll};
36
37use futures_core::Stream;
38use tds_protocol::token::{ColMetaData, NbcRow, RawRow};
39
40use crate::error::Error;
41use crate::row::{Column, Row};
42
43/// A row that may be already decoded or still held as raw TDS bytes.
44///
45/// The lazy-parse query path enqueues raw rows (cheap `Bytes` slices into
46/// the original response payload) and decodes them on demand. The eager
47/// path used by tests and [`MultiResultStream::into_query_streams`] wraps
48/// already-decoded rows.
49#[derive(Debug, Clone)]
50pub(crate) enum PendingRow {
51    /// Already-decoded row (eager path — tests + `MultiResultStream` compat).
52    Parsed(Row),
53    /// Raw TDS row bytes, to be decoded on pull.
54    Raw(RawRow),
55    /// Null-bitmap-compressed row bytes, to be decoded on pull.
56    Nbc(NbcRow),
57}
58
59/// A result set from a query, yielding rows one at a time.
60///
61/// The complete server response is already buffered in memory by the time
62/// this is returned; each [`Row`] is *decoded* lazily as it is pulled, not
63/// fetched incrementally from the network. Peak memory is therefore roughly
64/// the size of the raw response payload regardless of how you iterate. For
65/// genuinely large result sets, page with `OFFSET`/`FETCH` in SQL rather
66/// than relying on this type to bound memory.
67///
68/// # Example
69///
70/// ```rust,no_run
71/// # use mssql_client::Row;
72/// # fn process_row(_: &Row) {}
73/// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
74/// let stream = client.query("SELECT * FROM large_table", &[]).await?;
75/// for row in stream {
76///     let row = row?;
77///     process_row(&row);
78/// }
79/// # Ok(())
80/// # }
81/// ```
82#[must_use = "streams must be consumed; dropping a stream discards remaining rows"]
83pub struct QueryStream<'a> {
84    /// Column metadata for the result set.
85    columns: Vec<Column>,
86    /// Buffered rows (typed or raw) from the response.
87    rows: VecDeque<PendingRow>,
88    /// Protocol metadata needed to decode raw rows. `None` if every pending
89    /// row is already [`PendingRow::Parsed`].
90    meta: Option<ColMetaData>,
91    /// Pre-resolved column decryptor for Always Encrypted result sets.
92    ///
93    /// Wrapped in `Arc` so lazy result-set containers (`ResultSet`) can share
94    /// this state without duplicating the derived-key material.
95    #[cfg(feature = "always-encrypted")]
96    decryptor: Option<std::sync::Arc<crate::column_decryptor::ColumnDecryptor>>,
97    /// Whether the stream has completed.
98    finished: bool,
99    /// Lifetime tied to the connection.
100    _marker: std::marker::PhantomData<&'a ()>,
101}
102
103impl QueryStream<'_> {
104    /// Create a new query stream from already-decoded rows.
105    ///
106    /// This is the eager constructor used by unit tests. Production query
107    /// paths use [`QueryStream::from_raw`] to defer row decoding.
108    /// [`MultiResultStream::into_query_streams`] uses
109    /// [`ResultSet::into_query_stream`] to preserve pending-row state.
110    #[cfg(test)]
111    pub(crate) fn new(columns: Vec<Column>, rows: Vec<Row>) -> Self {
112        Self {
113            columns,
114            rows: rows.into_iter().map(PendingRow::Parsed).collect(),
115            meta: None,
116            #[cfg(feature = "always-encrypted")]
117            decryptor: None,
118            finished: false,
119            _marker: std::marker::PhantomData,
120        }
121    }
122
123    /// Create a query stream from raw row bytes and protocol metadata.
124    ///
125    /// Rows are decoded on demand as the stream is pulled. The `meta` must
126    /// describe every row in `pending`. If decryption is configured,
127    /// `decryptor` must cover the same column set.
128    pub(crate) fn from_raw(
129        columns: Vec<Column>,
130        pending: Vec<PendingRow>,
131        meta: ColMetaData,
132        #[cfg(feature = "always-encrypted")] decryptor: Option<
133            std::sync::Arc<crate::column_decryptor::ColumnDecryptor>,
134        >,
135    ) -> Self {
136        Self {
137            columns,
138            rows: pending.into(),
139            meta: Some(meta),
140            #[cfg(feature = "always-encrypted")]
141            decryptor,
142            finished: false,
143            _marker: std::marker::PhantomData,
144        }
145    }
146
147    /// Create an empty query stream (no results).
148    #[allow(dead_code)]
149    pub(crate) fn empty() -> Self {
150        Self {
151            columns: Vec::new(),
152            rows: VecDeque::new(),
153            meta: None,
154            #[cfg(feature = "always-encrypted")]
155            decryptor: None,
156            finished: true,
157            _marker: std::marker::PhantomData,
158        }
159    }
160
161    /// Get the column metadata for this result set.
162    #[must_use]
163    pub fn columns(&self) -> &[Column] {
164        &self.columns
165    }
166
167    /// Check if the stream has finished.
168    #[must_use]
169    pub fn is_finished(&self) -> bool {
170        self.finished
171    }
172
173    /// Get the number of rows remaining in the buffer.
174    #[must_use]
175    pub fn rows_remaining(&self) -> usize {
176        self.rows.len()
177    }
178
179    /// Collect all remaining rows into a vector.
180    ///
181    /// This consumes the stream and loads all rows into memory. Each row is
182    /// decoded lazily here, so large raw payloads are freed as rows are
183    /// produced rather than held alongside the typed `Vec<Row>` throughout
184    /// the caller's query call.
185    ///
186    /// For very large result sets, consider iterating with the stream
187    /// instead.
188    pub async fn collect_all(mut self) -> Result<Vec<Row>, Error> {
189        let mut out = Vec::with_capacity(self.rows.len());
190        while let Some(pending) = self.rows.pop_front() {
191            out.push(self.decode(pending)?);
192        }
193        self.finished = true;
194        Ok(out)
195    }
196
197    /// Try to get the next row synchronously (without async).
198    ///
199    /// Returns `None` when no more rows are available or the next pending
200    /// row fails to decode. Use [`Iterator::next`] instead if you need to
201    /// observe decode errors.
202    pub fn try_next(&mut self) -> Option<Row> {
203        self.next().and_then(|r| r.ok())
204    }
205
206    /// Decode a pending row into a typed [`Row`].
207    fn decode(&self, pending: PendingRow) -> Result<Row, Error> {
208        match pending {
209            PendingRow::Parsed(row) => Ok(row),
210            PendingRow::Raw(raw) => {
211                let meta = self
212                    .meta
213                    .as_ref()
214                    .ok_or_else(|| Error::Protocol("row metadata missing for raw row".into()))?;
215                #[cfg(feature = "always-encrypted")]
216                if let Some(ref dec) = self.decryptor {
217                    return crate::column_parser::convert_raw_row_decrypted(
218                        &raw,
219                        meta,
220                        &self.columns,
221                        dec,
222                    );
223                }
224                crate::column_parser::convert_raw_row(&raw, meta, &self.columns)
225            }
226            PendingRow::Nbc(nbc) => {
227                let meta = self
228                    .meta
229                    .as_ref()
230                    .ok_or_else(|| Error::Protocol("row metadata missing for NBC row".into()))?;
231                #[cfg(feature = "always-encrypted")]
232                if let Some(ref dec) = self.decryptor {
233                    return crate::column_parser::convert_nbc_row_decrypted(
234                        &nbc,
235                        meta,
236                        &self.columns,
237                        dec,
238                    );
239                }
240                crate::column_parser::convert_nbc_row(&nbc, meta, &self.columns)
241            }
242        }
243    }
244}
245
246impl Stream for QueryStream<'_> {
247    type Item = Result<Row, Error>;
248
249    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
250        let this = self.get_mut();
251
252        if this.finished {
253            return Poll::Ready(None);
254        }
255
256        match this.rows.pop_front() {
257            Some(pending) => Poll::Ready(Some(this.decode(pending))),
258            None => {
259                this.finished = true;
260                Poll::Ready(None)
261            }
262        }
263    }
264}
265
266impl ExactSizeIterator for QueryStream<'_> {}
267
268impl Iterator for QueryStream<'_> {
269    type Item = Result<Row, Error>;
270
271    fn next(&mut self) -> Option<Self::Item> {
272        if self.finished {
273            return None;
274        }
275
276        match self.rows.pop_front() {
277            Some(pending) => Some(self.decode(pending)),
278            None => {
279                self.finished = true;
280                None
281            }
282        }
283    }
284
285    fn size_hint(&self) -> (usize, Option<usize>) {
286        let remaining = self.rows.len();
287        (remaining, Some(remaining))
288    }
289}
290
291/// Result of a non-query execution.
292///
293/// Contains the number of affected rows and any output parameters.
294#[derive(Debug, Clone)]
295#[non_exhaustive]
296#[must_use]
297pub struct ExecuteResult {
298    /// Number of rows affected by the statement.
299    pub rows_affected: u64,
300    /// Output parameters from stored procedures.
301    pub output_params: Vec<OutputParam>,
302}
303
304/// An output parameter from a stored procedure call.
305#[derive(Debug, Clone)]
306#[non_exhaustive]
307pub struct OutputParam {
308    /// Parameter name.
309    pub name: String,
310    /// Parameter value.
311    pub value: mssql_types::SqlValue,
312}
313
314impl ExecuteResult {
315    /// Create a new execute result.
316    pub fn new(rows_affected: u64) -> Self {
317        Self {
318            rows_affected,
319            output_params: Vec::new(),
320        }
321    }
322
323    /// Create a result with output parameters.
324    pub fn with_outputs(rows_affected: u64, output_params: Vec<OutputParam>) -> Self {
325        Self {
326            rows_affected,
327            output_params,
328        }
329    }
330
331    /// Get an output parameter by name.
332    #[must_use]
333    pub fn get_output(&self, name: &str) -> Option<&OutputParam> {
334        self.output_params
335            .iter()
336            .find(|p| p.name.eq_ignore_ascii_case(name))
337    }
338}
339
340/// Result of a stored procedure execution.
341///
342/// Contains the return value, affected row count, output parameters,
343/// and any result sets produced by the procedure.
344///
345/// # Example
346///
347/// ```rust,no_run
348/// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
349/// let result = client.call_procedure("dbo.GetUser", &[&1i32]).await?;
350///
351/// // Check the return value (RETURN statement in the proc)
352/// assert_eq!(result.return_value, 0);
353///
354/// // Process result sets
355/// for mut rs in result.result_sets {
356///     while let Some(row) = rs.next_row() {
357///         let row = row?;
358///         println!("{:?}", row);
359///     }
360/// }
361/// # Ok(())
362/// # }
363/// ```
364#[derive(Debug, Clone)]
365#[non_exhaustive]
366#[must_use]
367pub struct ProcedureResult {
368    /// Return value from the stored procedure's RETURN statement.
369    ///
370    /// Defaults to 0 if the procedure does not explicitly return a value,
371    /// which matches SQL Server's default behavior.
372    pub return_value: i32,
373    /// Total number of rows affected by statements within the procedure.
374    pub rows_affected: u64,
375    /// Output parameters returned by the procedure.
376    pub output_params: Vec<OutputParam>,
377    /// Result sets produced by SELECT statements within the procedure.
378    pub result_sets: Vec<ResultSet>,
379}
380
381impl ProcedureResult {
382    /// Create a new empty procedure result.
383    pub(crate) fn new() -> Self {
384        Self {
385            return_value: 0,
386            rows_affected: 0,
387            output_params: Vec::new(),
388            result_sets: Vec::new(),
389        }
390    }
391
392    /// Get the return value from the stored procedure.
393    ///
394    /// This is the value from the procedure's `RETURN` statement.
395    /// Defaults to 0 if not explicitly set by the procedure.
396    #[must_use]
397    pub fn get_return_value(&self) -> i32 {
398        self.return_value
399    }
400
401    /// Get an output parameter by name (case-insensitive).
402    ///
403    /// Strips the `@` prefix from both the search name and stored names
404    /// before comparing, so `get_output("result")` and `get_output("@result")`
405    /// are equivalent.
406    ///
407    /// # Example
408    ///
409    /// ```rust,no_run
410    /// # use mssql_client::SqlValue;
411    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
412    /// let result = client.procedure("dbo.CalculateSum")?
413    ///     .input("@a", &10i32)
414    ///     .input("@b", &20i32)
415    ///     .output_int("@result")
416    ///     .execute().await?;
417    ///
418    /// let output = result.get_output("@result").expect("output param exists");
419    /// assert_eq!(output.value, SqlValue::Int(30));
420    /// # Ok(())
421    /// # }
422    /// ```
423    #[must_use]
424    pub fn get_output(&self, name: &str) -> Option<&OutputParam> {
425        let search = name.strip_prefix('@').unwrap_or(name);
426        self.output_params.iter().find(|p| {
427            let stored = p.name.strip_prefix('@').unwrap_or(&p.name);
428            stored.eq_ignore_ascii_case(search)
429        })
430    }
431
432    /// Get the first result set, if any.
433    ///
434    /// Convenience method for procedures that return a single result set.
435    #[must_use]
436    pub fn first_result_set(&self) -> Option<&ResultSet> {
437        self.result_sets.first()
438    }
439
440    /// Check if the procedure produced any result sets.
441    #[must_use]
442    pub fn has_result_sets(&self) -> bool {
443        !self.result_sets.is_empty()
444    }
445}
446
447/// A single result set within a multi-result batch.
448///
449/// Rows are stored as `PendingRow` values that may be either already-decoded
450/// [`Row`]s (eager path, used by tests and direct construction) or raw TDS
451/// bytes (lazy path, used by [`crate::Client::call_procedure`] and
452/// [`crate::Client::query_multiple`]). Decoding happens on pull, so per-row
453/// decode errors surface through [`ResultSet::next_row`] and
454/// [`ResultSet::collect_all`].
455#[derive(Debug, Clone)]
456#[must_use]
457pub struct ResultSet {
458    /// Column metadata for this result set.
459    columns: Vec<Column>,
460    /// Pending rows — either pre-parsed or raw TDS bytes awaiting decode.
461    pending_rows: VecDeque<PendingRow>,
462    /// Protocol metadata required to decode raw rows. `None` when every
463    /// pending row is already [`PendingRow::Parsed`] (eager path).
464    meta: Option<ColMetaData>,
465    /// Pre-resolved column decryptor for Always Encrypted result sets.
466    ///
467    /// Wrapped in `Arc` so cloning a [`ResultSet`] stays cheap (clones share
468    /// the underlying decryptor state instead of duplicating derived keys).
469    #[cfg(feature = "always-encrypted")]
470    decryptor: Option<std::sync::Arc<crate::column_decryptor::ColumnDecryptor>>,
471}
472
473impl ResultSet {
474    /// Create a new result set from already-decoded rows.
475    ///
476    /// This is the eager constructor used by tests and callers that already
477    /// hold typed [`Row`]s. Production query paths use `ResultSet::from_raw`
478    /// (private) to defer row decoding.
479    pub fn new(columns: Vec<Column>, rows: Vec<Row>) -> Self {
480        Self {
481            columns,
482            pending_rows: rows.into_iter().map(PendingRow::Parsed).collect(),
483            meta: None,
484            #[cfg(feature = "always-encrypted")]
485            decryptor: None,
486        }
487    }
488
489    /// Create a result set from raw row bytes and protocol metadata.
490    ///
491    /// Rows are decoded on demand as the caller pulls them via
492    /// [`ResultSet::next_row`] or [`ResultSet::collect_all`]. The `meta` must
493    /// describe every row in `pending`. If decryption is configured,
494    /// `decryptor` must cover the same column set.
495    pub(crate) fn from_raw(
496        columns: Vec<Column>,
497        pending: Vec<PendingRow>,
498        meta: ColMetaData,
499        #[cfg(feature = "always-encrypted")] decryptor: Option<
500            std::sync::Arc<crate::column_decryptor::ColumnDecryptor>,
501        >,
502    ) -> Self {
503        Self {
504            columns,
505            pending_rows: pending.into(),
506            meta: Some(meta),
507            #[cfg(feature = "always-encrypted")]
508            decryptor,
509        }
510    }
511
512    /// Get the column metadata.
513    #[must_use]
514    pub fn columns(&self) -> &[Column] {
515        &self.columns
516    }
517
518    /// Get the number of rows remaining.
519    #[must_use]
520    pub fn rows_remaining(&self) -> usize {
521        self.pending_rows.len()
522    }
523
524    /// Get the next row from this result set.
525    ///
526    /// Returns `None` when no more rows remain, or `Some(Err(_))` when the
527    /// next pending row fails to decode. The stream is not short-circuited
528    /// on decode error — the caller may continue to pull subsequent rows.
529    pub fn next_row(&mut self) -> Option<Result<Row, Error>> {
530        self.pending_rows.pop_front().map(|p| self.decode(p))
531    }
532
533    /// Check if this result set is empty.
534    #[must_use]
535    pub fn is_empty(&self) -> bool {
536        self.pending_rows.is_empty()
537    }
538
539    /// Collect all remaining rows into a vector.
540    ///
541    /// Stops at the first decode error and returns it.
542    pub fn collect_all(&mut self) -> Result<Vec<Row>, Error> {
543        let mut out = Vec::with_capacity(self.pending_rows.len());
544        while let Some(pending) = self.pending_rows.pop_front() {
545            out.push(self.decode(pending)?);
546        }
547        Ok(out)
548    }
549
550    /// Decode a pending row into a typed [`Row`].
551    fn decode(&self, pending: PendingRow) -> Result<Row, Error> {
552        match pending {
553            PendingRow::Parsed(row) => Ok(row),
554            PendingRow::Raw(raw) => {
555                let meta = self
556                    .meta
557                    .as_ref()
558                    .ok_or_else(|| Error::Protocol("row metadata missing for raw row".into()))?;
559                #[cfg(feature = "always-encrypted")]
560                if let Some(ref dec) = self.decryptor {
561                    return crate::column_parser::convert_raw_row_decrypted(
562                        &raw,
563                        meta,
564                        &self.columns,
565                        dec,
566                    );
567                }
568                crate::column_parser::convert_raw_row(&raw, meta, &self.columns)
569            }
570            PendingRow::Nbc(nbc) => {
571                let meta = self
572                    .meta
573                    .as_ref()
574                    .ok_or_else(|| Error::Protocol("row metadata missing for NBC row".into()))?;
575                #[cfg(feature = "always-encrypted")]
576                if let Some(ref dec) = self.decryptor {
577                    return crate::column_parser::convert_nbc_row_decrypted(
578                        &nbc,
579                        meta,
580                        &self.columns,
581                        dec,
582                    );
583                }
584                crate::column_parser::convert_nbc_row(&nbc, meta, &self.columns)
585            }
586        }
587    }
588
589    /// Consume this result set and produce a [`QueryStream`] that carries the
590    /// same pending rows and decode state.
591    ///
592    /// Used by [`MultiResultStream::into_query_streams`] — avoids eagerly
593    /// materializing rows when the caller wants stream-level ergonomics.
594    fn into_query_stream<'a>(self) -> QueryStream<'a> {
595        QueryStream {
596            columns: self.columns,
597            rows: self.pending_rows,
598            meta: self.meta,
599            #[cfg(feature = "always-encrypted")]
600            decryptor: self.decryptor,
601            finished: false,
602            _marker: std::marker::PhantomData,
603        }
604    }
605}
606
607/// Multiple result sets from a batch or stored procedure.
608///
609/// Some queries return multiple result sets (e.g., stored procedures
610/// with multiple SELECT statements, or batches with multiple queries).
611///
612/// # Example
613///
614/// ```rust,no_run
615/// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
616/// // Execute a batch with multiple SELECT statements
617/// let mut results = client.query_multiple("SELECT 1 AS a; SELECT 2 AS b, 3 AS c;", &[]).await?;
618///
619/// // Process first result set
620/// while let Some(row) = results.next_row().await? {
621///     println!("Result 1: {:?}", row);
622/// }
623///
624/// // Move to second result set
625/// if results.next_result().await? {
626///     while let Some(row) = results.next_row().await? {
627///         println!("Result 2: {:?}", row);
628///     }
629/// }
630/// # Ok(())
631/// # }
632/// ```
633#[must_use = "streams must be consumed; dropping a stream discards remaining results"]
634pub struct MultiResultStream<'a> {
635    /// All result sets from the batch.
636    result_sets: Vec<ResultSet>,
637    /// Current result set index (0-based).
638    current_result: usize,
639    /// Lifetime tied to the connection.
640    _marker: std::marker::PhantomData<&'a ()>,
641}
642
643impl<'a> MultiResultStream<'a> {
644    /// Create a new multi-result stream from parsed result sets.
645    pub(crate) fn new(result_sets: Vec<ResultSet>) -> Self {
646        Self {
647            result_sets,
648            current_result: 0,
649            _marker: std::marker::PhantomData,
650        }
651    }
652
653    /// Create an empty multi-result stream.
654    #[allow(dead_code)]
655    pub(crate) fn empty() -> Self {
656        Self {
657            result_sets: Vec::new(),
658            current_result: 0,
659            _marker: std::marker::PhantomData,
660        }
661    }
662
663    /// Get the current result set index (0-based).
664    #[must_use]
665    pub fn current_result_index(&self) -> usize {
666        self.current_result
667    }
668
669    /// Get the total number of result sets.
670    #[must_use]
671    pub fn result_count(&self) -> usize {
672        self.result_sets.len()
673    }
674
675    /// Check if there are more result sets after the current one.
676    #[must_use]
677    pub fn has_more_results(&self) -> bool {
678        self.current_result + 1 < self.result_sets.len()
679    }
680
681    /// Get the column metadata for the current result set.
682    ///
683    /// Returns `None` if there are no result sets or we've moved past all of them.
684    #[must_use]
685    pub fn columns(&self) -> Option<&[Column]> {
686        self.result_sets
687            .get(self.current_result)
688            .map(|rs| rs.columns())
689    }
690
691    /// Move to the next result set.
692    ///
693    /// Returns `true` if there is another result set, `false` if no more.
694    pub async fn next_result(&mut self) -> Result<bool, Error> {
695        if self.current_result + 1 < self.result_sets.len() {
696            self.current_result += 1;
697            Ok(true)
698        } else {
699            Ok(false)
700        }
701    }
702
703    /// Get the next row from the current result set.
704    ///
705    /// Returns `None` when no more rows in the current result set.
706    /// Call `next_result()` to move to the next result set.
707    ///
708    /// Per-row decode errors (from lazy row decoding) surface here as
709    /// `Err(_)`. Pre-2.9 this reader decoded rows eagerly and decode errors
710    /// surfaced at `query_multiple().await?` instead.
711    pub async fn next_row(&mut self) -> Result<Option<Row>, Error> {
712        if let Some(result_set) = self.result_sets.get_mut(self.current_result) {
713            result_set.next_row().transpose()
714        } else {
715            Ok(None)
716        }
717    }
718
719    /// Get a mutable reference to the current result set.
720    #[must_use]
721    pub fn current_result_set(&mut self) -> Option<&mut ResultSet> {
722        self.result_sets.get_mut(self.current_result)
723    }
724
725    /// Collect all rows from the current result set.
726    ///
727    /// Returns `Ok(vec![])` if the current result index is out of range
728    /// (e.g., all result sets have been consumed). Propagates decode errors
729    /// from the underlying lazy row parser.
730    pub fn collect_current(&mut self) -> Result<Vec<Row>, Error> {
731        match self.result_sets.get_mut(self.current_result) {
732            Some(rs) => rs.collect_all(),
733            None => Ok(Vec::new()),
734        }
735    }
736
737    /// Consume the stream and return all result sets as `QueryStream`s.
738    pub fn into_query_streams(self) -> Vec<QueryStream<'a>> {
739        self.result_sets
740            .into_iter()
741            .map(ResultSet::into_query_stream)
742            .collect()
743    }
744}
745
746#[cfg(test)]
747#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
748mod tests {
749    use super::*;
750
751    #[test]
752    fn test_execute_result() {
753        let result = ExecuteResult::new(42);
754        assert_eq!(result.rows_affected, 42);
755        assert!(result.output_params.is_empty());
756    }
757
758    #[test]
759    fn test_procedure_result_defaults() {
760        let result = ProcedureResult::new();
761        assert_eq!(result.return_value, 0);
762        assert_eq!(result.rows_affected, 0);
763        assert!(result.output_params.is_empty());
764        assert!(result.result_sets.is_empty());
765        assert!(!result.has_result_sets());
766        assert!(result.first_result_set().is_none());
767    }
768
769    #[test]
770    fn test_procedure_result_get_output() {
771        let mut result = ProcedureResult::new();
772        result.output_params.push(OutputParam {
773            name: "@Total".to_string(),
774            value: mssql_types::SqlValue::Int(42),
775        });
776        result.output_params.push(OutputParam {
777            name: "@Message".to_string(),
778            value: mssql_types::SqlValue::String("ok".to_string()),
779        });
780
781        // Exact match (case-insensitive)
782        assert!(result.get_output("@Total").is_some());
783        assert!(result.get_output("@total").is_some());
784        assert!(result.get_output("@TOTAL").is_some());
785
786        // @ prefix stripping
787        assert!(result.get_output("Total").is_some());
788        assert!(result.get_output("total").is_some());
789
790        // Non-existent
791        assert!(result.get_output("@NotHere").is_none());
792        assert!(result.get_output("NotHere").is_none());
793    }
794
795    #[test]
796    fn test_procedure_result_with_result_sets() {
797        use mssql_types::SqlValue;
798
799        let columns = vec![Column {
800            name: "id".to_string(),
801            index: 0,
802            type_name: "INT".to_string(),
803            nullable: false,
804            max_length: Some(4),
805            precision: None,
806            scale: None,
807            collation: None,
808        }];
809        let rows = vec![Row::from_values(columns.clone(), vec![SqlValue::Int(1)])];
810        let rs = ResultSet::new(columns, rows);
811
812        let mut result = ProcedureResult::new();
813        result.result_sets.push(rs);
814        result.return_value = 7;
815        result.rows_affected = 5;
816
817        assert!(result.has_result_sets());
818        assert_eq!(result.get_return_value(), 7);
819        assert_eq!(result.first_result_set().unwrap().columns().len(), 1);
820    }
821
822    #[test]
823    fn test_execute_result_with_outputs() {
824        let outputs = vec![OutputParam {
825            name: "ReturnValue".to_string(),
826            value: mssql_types::SqlValue::Int(100),
827        }];
828
829        let result = ExecuteResult::with_outputs(10, outputs);
830        assert_eq!(result.rows_affected, 10);
831        assert!(result.get_output("ReturnValue").is_some());
832        assert!(result.get_output("returnvalue").is_some()); // case-insensitive
833        assert!(result.get_output("NotFound").is_none());
834    }
835
836    #[test]
837    fn test_query_stream_columns() {
838        let columns = vec![Column {
839            name: "id".to_string(),
840            index: 0,
841            type_name: "INT".to_string(),
842            nullable: false,
843            max_length: Some(4),
844            precision: Some(0),
845            scale: Some(0),
846            collation: None,
847        }];
848
849        let stream = QueryStream::new(columns, Vec::new());
850        assert_eq!(stream.columns().len(), 1);
851        assert_eq!(stream.columns()[0].name, "id");
852        assert!(!stream.is_finished());
853    }
854
855    #[test]
856    fn test_query_stream_with_rows() {
857        use mssql_types::SqlValue;
858
859        let columns = vec![
860            Column {
861                name: "id".to_string(),
862                index: 0,
863                type_name: "INT".to_string(),
864                nullable: false,
865                max_length: Some(4),
866                precision: None,
867                scale: None,
868                collation: None,
869            },
870            Column {
871                name: "name".to_string(),
872                index: 1,
873                type_name: "NVARCHAR".to_string(),
874                nullable: true,
875                max_length: Some(100),
876                precision: None,
877                scale: None,
878                collation: None,
879            },
880        ];
881
882        let rows = vec![
883            Row::from_values(
884                columns.clone(),
885                vec![SqlValue::Int(1), SqlValue::String("Alice".to_string())],
886            ),
887            Row::from_values(
888                columns.clone(),
889                vec![SqlValue::Int(2), SqlValue::String("Bob".to_string())],
890            ),
891        ];
892
893        let mut stream = QueryStream::new(columns, rows);
894        assert_eq!(stream.columns().len(), 2);
895        assert_eq!(stream.rows_remaining(), 2);
896        assert!(!stream.is_finished());
897
898        // First row
899        let row1 = stream.try_next().unwrap();
900        assert_eq!(row1.get::<i32>(0).unwrap(), 1);
901        assert_eq!(row1.get_by_name::<String>("name").unwrap(), "Alice");
902
903        // Second row
904        let row2 = stream.try_next().unwrap();
905        assert_eq!(row2.get::<i32>(0).unwrap(), 2);
906        assert_eq!(row2.get_by_name::<String>("name").unwrap(), "Bob");
907
908        // No more rows
909        assert!(stream.try_next().is_none());
910        assert!(stream.is_finished());
911    }
912
913    #[test]
914    fn test_query_stream_iterator() {
915        use mssql_types::SqlValue;
916
917        let columns = vec![Column {
918            name: "val".to_string(),
919            index: 0,
920            type_name: "INT".to_string(),
921            nullable: false,
922            max_length: None,
923            precision: None,
924            scale: None,
925            collation: None,
926        }];
927
928        let rows = vec![
929            Row::from_values(columns.clone(), vec![SqlValue::Int(10)]),
930            Row::from_values(columns.clone(), vec![SqlValue::Int(20)]),
931            Row::from_values(columns.clone(), vec![SqlValue::Int(30)]),
932        ];
933
934        let mut stream = QueryStream::new(columns, rows);
935
936        // Use iterator — unwrap each Result so test failures are visible
937        // (QueryStream's Iterator impl always yields Ok, but we should
938        // not silently swallow errors if that ever changes)
939        let values: Vec<i32> = stream
940            .by_ref()
941            .map(|r| r.unwrap().get::<i32>(0).unwrap())
942            .collect();
943
944        assert_eq!(values, vec![10, 20, 30]);
945        assert!(stream.is_finished());
946    }
947
948    #[test]
949    fn test_query_stream_empty() {
950        let stream = QueryStream::empty();
951        assert!(stream.columns().is_empty());
952        assert_eq!(stream.rows_remaining(), 0);
953        assert!(stream.is_finished());
954    }
955
956    /// Exercises the lazy-decode path: rows are stored as raw TDS bytes and
957    /// decoded only when the caller pulls them. Mirrors what
958    /// `read_query_response` now produces and pins the contract between
959    /// `PendingRow::Raw` and the per-row decode in `poll_next`/`next`.
960    #[test]
961    fn test_query_stream_lazy_raw_row_decoding() {
962        use bytes::Bytes;
963        use tds_protocol::token::{ColMetaData, ColumnData, RawRow, TypeInfo};
964        use tds_protocol::types::TypeId;
965
966        // Build raw row bytes for two columns: IntN(42) + IntN(NULL).
967        let mut data = Vec::new();
968        data.push(4); // IntN length prefix — 4 bytes
969        data.extend_from_slice(&42i32.to_le_bytes());
970        data.push(0); // IntN NULL (zero-length)
971
972        let meta = ColMetaData {
973            columns: vec![
974                ColumnData {
975                    name: "a".to_string(),
976                    type_id: TypeId::IntN,
977                    col_type: 0x26,
978                    flags: 0x00,
979                    user_type: 0,
980                    type_info: TypeInfo {
981                        max_length: Some(4),
982                        precision: None,
983                        scale: None,
984                        collation: None,
985                    },
986                    crypto_metadata: None,
987                },
988                ColumnData {
989                    name: "b".to_string(),
990                    type_id: TypeId::IntN,
991                    col_type: 0x26,
992                    flags: 0x01,
993                    user_type: 0,
994                    type_info: TypeInfo {
995                        max_length: Some(4),
996                        precision: None,
997                        scale: None,
998                        collation: None,
999                    },
1000                    crypto_metadata: None,
1001                },
1002            ],
1003            cek_table: None,
1004        };
1005
1006        let columns = vec![
1007            Column {
1008                name: "a".to_string(),
1009                index: 0,
1010                type_name: "INT".to_string(),
1011                nullable: false,
1012                max_length: Some(4),
1013                precision: None,
1014                scale: None,
1015                collation: None,
1016            },
1017            Column {
1018                name: "b".to_string(),
1019                index: 1,
1020                type_name: "INT".to_string(),
1021                nullable: true,
1022                max_length: Some(4),
1023                precision: None,
1024                scale: None,
1025                collation: None,
1026            },
1027        ];
1028
1029        let pending = vec![PendingRow::Raw(RawRow {
1030            data: Bytes::from(data),
1031        })];
1032
1033        #[cfg(feature = "always-encrypted")]
1034        let mut stream = QueryStream::from_raw(columns, pending, meta, None);
1035        #[cfg(not(feature = "always-encrypted"))]
1036        let mut stream = QueryStream::from_raw(columns, pending, meta);
1037
1038        assert_eq!(stream.rows_remaining(), 1);
1039        let row = stream
1040            .next()
1041            .expect("one row pending")
1042            .expect("row decoded successfully");
1043        assert_eq!(row.get::<i32>(0).unwrap(), 42);
1044        assert!(row.is_null(1));
1045        assert!(stream.next().is_none());
1046        assert!(stream.is_finished());
1047    }
1048
1049    /// Decoder errors must surface per-row via `Stream`/`Iterator` without
1050    /// derailing the stream state. Truncated raw bytes trigger a decode
1051    /// error that the caller observes as `Some(Err(_))`.
1052    #[test]
1053    fn test_query_stream_lazy_decode_error_propagates() {
1054        use bytes::Bytes;
1055        use tds_protocol::token::{ColMetaData, ColumnData, RawRow, TypeInfo};
1056        use tds_protocol::types::TypeId;
1057
1058        // Declare an Int4 column but provide only 2 bytes — decode must fail.
1059        let data = vec![0x01u8, 0x02];
1060
1061        let meta = ColMetaData {
1062            columns: vec![ColumnData {
1063                name: "a".to_string(),
1064                type_id: TypeId::Int4,
1065                col_type: 0x38,
1066                flags: 0x00,
1067                user_type: 0,
1068                type_info: TypeInfo {
1069                    max_length: Some(4),
1070                    precision: None,
1071                    scale: None,
1072                    collation: None,
1073                },
1074                crypto_metadata: None,
1075            }],
1076            cek_table: None,
1077        };
1078
1079        let columns = vec![Column {
1080            name: "a".to_string(),
1081            index: 0,
1082            type_name: "INT".to_string(),
1083            nullable: false,
1084            max_length: Some(4),
1085            precision: None,
1086            scale: None,
1087            collation: None,
1088        }];
1089
1090        let pending = vec![PendingRow::Raw(RawRow {
1091            data: Bytes::from(data),
1092        })];
1093
1094        #[cfg(feature = "always-encrypted")]
1095        let mut stream = QueryStream::from_raw(columns, pending, meta, None);
1096        #[cfg(not(feature = "always-encrypted"))]
1097        let mut stream = QueryStream::from_raw(columns, pending, meta);
1098
1099        let item = stream.next().expect("pending row present");
1100        assert!(item.is_err(), "truncated bytes must surface a decode error");
1101        assert!(stream.next().is_none());
1102    }
1103
1104    /// Helper to build a single-column IntN metadata block for the lazy
1105    /// `ResultSet` / `MultiResultStream` tests below.
1106    #[cfg(test)]
1107    fn intn_meta_and_columns(
1108        col_name: &str,
1109        nullable: bool,
1110    ) -> (tds_protocol::token::ColMetaData, Vec<Column>) {
1111        use tds_protocol::token::{ColMetaData, ColumnData, TypeInfo};
1112        use tds_protocol::types::TypeId;
1113        (
1114            ColMetaData {
1115                columns: vec![ColumnData {
1116                    name: col_name.to_string(),
1117                    type_id: TypeId::IntN,
1118                    col_type: 0x26,
1119                    flags: if nullable { 0x01 } else { 0x00 },
1120                    user_type: 0,
1121                    type_info: TypeInfo {
1122                        max_length: Some(4),
1123                        precision: None,
1124                        scale: None,
1125                        collation: None,
1126                    },
1127                    crypto_metadata: None,
1128                }],
1129                cek_table: None,
1130            },
1131            vec![Column {
1132                name: col_name.to_string(),
1133                index: 0,
1134                type_name: "INT".to_string(),
1135                nullable,
1136                max_length: Some(4),
1137                precision: None,
1138                scale: None,
1139                collation: None,
1140            }],
1141        )
1142    }
1143
1144    /// Exercises the `ResultSet` lazy-decode path introduced in 2.9.
1145    /// Mirrors `test_query_stream_lazy_raw_row_decoding` but via the
1146    /// result-set API that `call_procedure` / `query_multiple` expose.
1147    #[test]
1148    fn test_result_set_lazy_raw_row_decoding() {
1149        use bytes::Bytes;
1150        use tds_protocol::token::RawRow;
1151
1152        let (meta, columns) = intn_meta_and_columns("a", false);
1153
1154        // Two rows: 7 and 11 encoded as IntN(4).
1155        let pending = vec![
1156            PendingRow::Raw(RawRow {
1157                data: {
1158                    let mut b = Vec::with_capacity(5);
1159                    b.push(4);
1160                    b.extend_from_slice(&7i32.to_le_bytes());
1161                    Bytes::from(b)
1162                },
1163            }),
1164            PendingRow::Raw(RawRow {
1165                data: {
1166                    let mut b = Vec::with_capacity(5);
1167                    b.push(4);
1168                    b.extend_from_slice(&11i32.to_le_bytes());
1169                    Bytes::from(b)
1170                },
1171            }),
1172        ];
1173
1174        #[cfg(feature = "always-encrypted")]
1175        let mut rs = ResultSet::from_raw(columns, pending, meta, None);
1176        #[cfg(not(feature = "always-encrypted"))]
1177        let mut rs = ResultSet::from_raw(columns, pending, meta);
1178
1179        assert_eq!(rs.rows_remaining(), 2);
1180        assert!(!rs.is_empty());
1181
1182        let row1 = rs.next_row().expect("row present").expect("decodes");
1183        assert_eq!(row1.get::<i32>(0).unwrap(), 7);
1184
1185        let row2 = rs.next_row().expect("row present").expect("decodes");
1186        assert_eq!(row2.get::<i32>(0).unwrap(), 11);
1187
1188        assert!(rs.next_row().is_none());
1189        assert!(rs.is_empty());
1190    }
1191
1192    /// Decoder errors in `ResultSet::next_row` must surface per-row without
1193    /// derailing further calls. Same contract as
1194    /// `test_query_stream_lazy_decode_error_propagates`.
1195    #[test]
1196    fn test_result_set_lazy_decode_error_propagates() {
1197        use bytes::Bytes;
1198        use tds_protocol::token::{ColMetaData, ColumnData, RawRow, TypeInfo};
1199        use tds_protocol::types::TypeId;
1200
1201        // Int4 (not IntN) with only 2 bytes → decode must fail.
1202        let meta = ColMetaData {
1203            columns: vec![ColumnData {
1204                name: "a".to_string(),
1205                type_id: TypeId::Int4,
1206                col_type: 0x38,
1207                flags: 0x00,
1208                user_type: 0,
1209                type_info: TypeInfo {
1210                    max_length: Some(4),
1211                    precision: None,
1212                    scale: None,
1213                    collation: None,
1214                },
1215                crypto_metadata: None,
1216            }],
1217            cek_table: None,
1218        };
1219        let columns = vec![Column {
1220            name: "a".to_string(),
1221            index: 0,
1222            type_name: "INT".to_string(),
1223            nullable: false,
1224            max_length: Some(4),
1225            precision: None,
1226            scale: None,
1227            collation: None,
1228        }];
1229
1230        let pending = vec![PendingRow::Raw(RawRow {
1231            data: Bytes::from(vec![0x01u8, 0x02]),
1232        })];
1233
1234        #[cfg(feature = "always-encrypted")]
1235        let mut rs = ResultSet::from_raw(columns, pending, meta, None);
1236        #[cfg(not(feature = "always-encrypted"))]
1237        let mut rs = ResultSet::from_raw(columns, pending, meta);
1238
1239        let first = rs.next_row().expect("pending row present");
1240        assert!(
1241            first.is_err(),
1242            "truncated bytes must surface a decode error"
1243        );
1244        assert!(rs.next_row().is_none());
1245    }
1246
1247    /// `collect_all` on a lazy `ResultSet` decodes every pending row and
1248    /// propagates the first decode error. Ensures 2.9's signature change is
1249    /// exercised end-to-end.
1250    #[test]
1251    fn test_result_set_lazy_collect_all_success_and_error() {
1252        use bytes::Bytes;
1253        use tds_protocol::token::RawRow;
1254
1255        // Success: two rows decode cleanly.
1256        let (meta_ok, cols_ok) = intn_meta_and_columns("a", false);
1257        let pending_ok = vec![
1258            PendingRow::Raw(RawRow {
1259                data: {
1260                    let mut b = Vec::with_capacity(5);
1261                    b.push(4);
1262                    b.extend_from_slice(&10i32.to_le_bytes());
1263                    Bytes::from(b)
1264                },
1265            }),
1266            PendingRow::Raw(RawRow {
1267                data: {
1268                    let mut b = Vec::with_capacity(5);
1269                    b.push(4);
1270                    b.extend_from_slice(&20i32.to_le_bytes());
1271                    Bytes::from(b)
1272                },
1273            }),
1274        ];
1275
1276        #[cfg(feature = "always-encrypted")]
1277        let mut rs_ok = ResultSet::from_raw(cols_ok, pending_ok, meta_ok, None);
1278        #[cfg(not(feature = "always-encrypted"))]
1279        let mut rs_ok = ResultSet::from_raw(cols_ok, pending_ok, meta_ok);
1280        let rows = rs_ok.collect_all().expect("all rows decode");
1281        assert_eq!(rows.len(), 2);
1282        assert_eq!(rows[0].get::<i32>(0).unwrap(), 10);
1283        assert_eq!(rows[1].get::<i32>(0).unwrap(), 20);
1284        assert!(rs_ok.is_empty());
1285
1286        // Error: a truncated row (declared Int4 with only 2 bytes) makes
1287        // collect_all fail. collect_all short-circuits on the first Err.
1288        use tds_protocol::token::{ColMetaData, ColumnData, TypeInfo};
1289        use tds_protocol::types::TypeId;
1290        let meta_err = ColMetaData {
1291            columns: vec![ColumnData {
1292                name: "a".to_string(),
1293                type_id: TypeId::Int4,
1294                col_type: 0x38,
1295                flags: 0x00,
1296                user_type: 0,
1297                type_info: TypeInfo {
1298                    max_length: Some(4),
1299                    precision: None,
1300                    scale: None,
1301                    collation: None,
1302                },
1303                crypto_metadata: None,
1304            }],
1305            cek_table: None,
1306        };
1307        let cols_err = vec![Column {
1308            name: "a".to_string(),
1309            index: 0,
1310            type_name: "INT".to_string(),
1311            nullable: false,
1312            max_length: Some(4),
1313            precision: None,
1314            scale: None,
1315            collation: None,
1316        }];
1317        let pending_err = vec![PendingRow::Raw(RawRow {
1318            data: Bytes::from(vec![0x01u8, 0x02]),
1319        })];
1320
1321        #[cfg(feature = "always-encrypted")]
1322        let mut rs_err = ResultSet::from_raw(cols_err, pending_err, meta_err, None);
1323        #[cfg(not(feature = "always-encrypted"))]
1324        let mut rs_err = ResultSet::from_raw(cols_err, pending_err, meta_err);
1325        let err = rs_err.collect_all();
1326        assert!(err.is_err(), "collect_all must propagate decode error");
1327    }
1328
1329    /// `MultiResultStream` end-to-end lazy-decode path: two lazy `ResultSet`s
1330    /// decoded on demand as the caller walks through via `next_row` /
1331    /// `next_result`. Pins the 2.9 refactor of `read_multi_result_response`.
1332    #[tokio::test]
1333    async fn test_multi_result_stream_lazy_decode_across_result_sets() {
1334        use bytes::Bytes;
1335        use tds_protocol::token::RawRow;
1336
1337        let (meta1, cols1) = intn_meta_and_columns("a", false);
1338        let pending1 = vec![PendingRow::Raw(RawRow {
1339            data: {
1340                let mut b = Vec::with_capacity(5);
1341                b.push(4);
1342                b.extend_from_slice(&101i32.to_le_bytes());
1343                Bytes::from(b)
1344            },
1345        })];
1346        #[cfg(feature = "always-encrypted")]
1347        let rs1 = ResultSet::from_raw(cols1, pending1, meta1, None);
1348        #[cfg(not(feature = "always-encrypted"))]
1349        let rs1 = ResultSet::from_raw(cols1, pending1, meta1);
1350
1351        let (meta2, cols2) = intn_meta_and_columns("b", false);
1352        let pending2 = vec![PendingRow::Raw(RawRow {
1353            data: {
1354                let mut b = Vec::with_capacity(5);
1355                b.push(4);
1356                b.extend_from_slice(&202i32.to_le_bytes());
1357                Bytes::from(b)
1358            },
1359        })];
1360        #[cfg(feature = "always-encrypted")]
1361        let rs2 = ResultSet::from_raw(cols2, pending2, meta2, None);
1362        #[cfg(not(feature = "always-encrypted"))]
1363        let rs2 = ResultSet::from_raw(cols2, pending2, meta2);
1364
1365        let mut stream = MultiResultStream::new(vec![rs1, rs2]);
1366        assert_eq!(stream.result_count(), 2);
1367        assert_eq!(stream.current_result_index(), 0);
1368
1369        let row = stream
1370            .next_row()
1371            .await
1372            .expect("first row success")
1373            .expect("row present");
1374        assert_eq!(row.get::<i32>(0).unwrap(), 101);
1375        assert!(stream.next_row().await.expect("no more rows").is_none());
1376
1377        assert!(stream.has_more_results());
1378        assert!(stream.next_result().await.expect("advance ok"));
1379        assert_eq!(stream.current_result_index(), 1);
1380
1381        let row = stream
1382            .next_row()
1383            .await
1384            .expect("second row success")
1385            .expect("row present");
1386        assert_eq!(row.get::<i32>(0).unwrap(), 202);
1387    }
1388}